This is a nice progress. good to see we have type safe fluent API.

Could you please explain a little bit more how typesafe would help generic
purpose monitoring?

In general purpose monitoring, do we need input type/output type defined?
How user describe their types from source to each processing element, and
how framework interpret user declared type into EAGLE internal type?

Thanks
Edward

On 12/6/15, 23:47, "Hao Chen" <[email protected]> wrote:

>JIRA: https://issues.apache.org/jira/browse/EAGLE-66
>Pull Request: https://github.com/apache/incubator-eagle/pull/17
>
>*Main Changes*
>
>1. *Typesafe API:* Currently the stream processing API is not type-safe
>(Type info are erased by stream framework), all programming interfaces for
>developer are faced to Object/AnyRef, which is not very friendly and
>extensible for framework user (application developer):
>
>public void flatMap(java.util.List<*Object*> input,
>Collector<Tuple2<String, AlertAPIEntity>> outputCollector)
>
>So i propose the interface as (all type information are transparent for
>developer, not need additional parameters, supported by Scala implicit
>TypeTag)
>
>*class StreamProducer[+T <: Any] extends StreamInfo with
>StreamProtocol[T]*
>
>   - *StreamInfo: * contains Stream core information including streamId,
>   processing element id/name, entity type info (class/TypeTag)
>   - *StreamProtocol extends JavaStreamProtocol*: contains basic Stream
>DSL
>   API and java-compatible API
>
>class *StreamInfo*  extends Serializable{
>  val id:Int = UniqueId.incrementAndGetId()
>  var name: String = null
>  var streamId:String=null
>  var parallelismNum: Int = 1
>  var inKeyed:Boolean = false
>  var outKeyed:Boolean = false
>  var keySelector:KeySelector = null
>
>  var typeClass:Class[_] = classOf[AnyRef]
>  @transient  implicit var *typeTag:ru.TypeTag[*_] = ru.typeTag[AnyRef]
>}
>
>2. *KeyValue Based Structure: *currently framework user (developer) have
>to
>handle with field declaration again and again, and framework and business
>logic are highly coupled, according to the StreamProtocol description,
>user
>should not care about framework level detail like internal data structure
>for Storm using List<Object> with Fields<String> which is not friendly for
>developer, we should make sure user focus on business logic only like:
>
>env.from(tuples)
>.groupByKey(_.name)
>
>3. *Spout grouping instead of overriding Schema*: currently especially in
>use case like HdfsAuditLog Monitoring, if developer wants to  groupby
>certain key, they are forced to override Schema (specific for storm) ,
>which is not good and un-reusable.
>
>4. *Environment Decoupled: *currently the stream (metadata) /dag (logic) /
>environment (execution) are coupled with storm internal implementation,
>which is not good for becoming a metadata-driven pipeline framework in
>future, so we should decouple it.
>
>*5. Compatible with Field-based Structure in old framework and
>application.*
>
>*Sample Application*
>
>case class Entity(name:String,value:Double,var inc:Int=0)
>
>val tuples = Seq(
>Entity("a", 1),
>Entity("a", 2),
>Entity("a", 3),
>Entity("b", 2),
>Entity("c", 3),
>Entity("d", 3)
>)
>
>val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
>
>
>*// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 *
>
>env.from(tuples)
>*.groupByKey(_.name)*
>.map(o => {o.inc += 2;o})
>.filter(_.name != "b")
>.filter(_.name != "c")
>*.groupByKey(o=>(o.name <http://o.name>,o.value))*
>.map(o => (o.name,o))
>.map(o => (o._1,o._2.value,o._2.inc))
>.foreach(println)
>
> env.execute()
>
>
>*Type is transparent for developer during both DAG compiling (programming)
>and runtime (metadata) phases*
>
>2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before
>expanded DAG
>{
>IterableStreamProducer[*Entity*]_1{1} ->
>GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
>GroupByKeyProducer[<function1>(*Entity*)]_2{1} -> MapProducer[Entity]_3{1}
>in shuffleGroup
>MapProducer[*Entity]*_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
>FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in
>shuffleGroup
>FilterProducer[*Entity*]_5{1} ->
>GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
>GroupByKeyProducer[<function1>(*Entity*)]_6{1} -> MapProducer[Tuple2]_7{1}
>in shuffleGroup
>MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
>MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
>}
>2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After
>expanded
>DAG
>{
>IterableStreamProducer[*Entity*]_1{1} -> MapProducer[Entity]_3{1} in
>groupByKey(<function1>)
>MapProducer[*Entity*]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
>FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in
>shuffleGroup
>FilterProducer[*Entity*]_5{1} -> MapProducer[Tuple2]_7{1} in
>groupByKey(<function1>)
>MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
>MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
>}
>2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]:
>Storm
>topology DAG
>{
>  Spout[IterableStreamProducer[*Entity*]_1]{1} ->
>Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
>Bolt[MapProducer[*Entity*]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in
>shuffleGroup
>Bolt[FilterProducer[*Entity*]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1}
>in shuffleGroup
>Bolt[FilterProducer[*Entity*]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in
>groupByKey(<function1>)
>Bolt[MapProducer[*Tuple2*]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in
>shuffleGroup
>Bolt[MapProducer[*Tuple3*]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in
>shuffleGroup
>}
>
>
>Regards,
>Hao

Reply via email to