Edward, Type-safe API currently is mainly for better development experience, not fully general-purpose monitoring.
For example, when the developer is required to implement groupBy "user", the implementation have to override lots of storm-specified code like: https://github.com/haoch/incubator-eagle/blob/master/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java#L64-L82 , which is not very good at all. The better way should be like: https://github.com/haoch/incubator-eagle/blob/EAGLE-66/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala#L83-L86 env.fromSpout[String](TestSpout()) // Declare stream type if not given .groupByKey(_.charAt(0)) // Especially in this line, pass into type-based key generator .foreach(println) env.execute() As a general purpose monitoring, especially for advanced monitoring case, at the first step I think we should provide such kinds of end-user-oriented API, so that they could customize the logic easily. Another benefit is that after separate the API into: Stream = SteamInfo (Type) + StreamProtocol (Connector) We could more easily declare the DAG in pure metadata 2015-12-14 17:53:49,891 INFO [main] utils.GraphPrinter$[56]: Print DOT digraph (copy and visualize with http://www.webgraphviz.com/) digraph dag { "kafkaMsgConsumer x 1" -> "HdfsUserCommandReassembler_3 x 1" [label = "groupByStrategy( org.apache.eagle.partition.PartitionStrategyImpl@43eb13dc )"]; "kafkaMsgConsumer x 1" -> "FileSensitivityDataJoinExecutor_6 x 1" [label = "groupByStrategy( org.apache.eagle.partition.PartitionStrategyImpl@43eb13dc )"]; "HdfsUserCommandReassembler_3 x 1" -> "FileSensitivityDataJoinExecutor_6 x 1" [label = "groupByFields( Buffer(0) )"]; "FileSensitivityDataJoinExecutor_6 x 1" -> "JavaStormExecutorForAlertWrapper_10 x 1" [label = "groupByFields( Buffer(0) )"]; "JavaStormExecutorForAlertWrapper_10 x 1" -> "hdfsAuditLogAlertExecutor_0 x 1" [label = "groupByFields( List(0) )"]; "hdfsAuditLogAlertExecutor_0 x 1" -> "AlertEntityDeduplicationExecutor_12 x 1" [label = "shuffleGroup"]; "hdfsAuditLogAlertExecutor_0 x 1" -> "AlertEmailDeduplicationExecutor_14 x 1" [label = "shuffleGroup"]; "AlertEntityDeduplicationExecutor_12 x 1" -> "AlertPersistExecutor_13 x 1" [label = "shuffleGroup"]; "AlertEmailDeduplicationExecutor_14 x 1" -> "AlertNotificationExecutor_15 x 1" [label = "shuffleGroup"]; } The next step to implement general monitoring is just doing reverse engineering. Regards, Hao On Mon, Dec 14, 2015 at 1:20 PM, Zhang, Edward (GDI Hadoop) < [email protected]> wrote: > This is a nice progress. good to see we have type safe fluent API. > > Could you please explain a little bit more how typesafe would help generic > purpose monitoring? > > In general purpose monitoring, do we need input type/output type defined? > How user describe their types from source to each processing element, and > how framework interpret user declared type into EAGLE internal type? > > Thanks > Edward > > On 12/6/15, 23:47, "Hao Chen" <[email protected]> wrote: > > >JIRA: https://issues.apache.org/jira/browse/EAGLE-66 > >Pull Request: https://github.com/apache/incubator-eagle/pull/17 > > > >*Main Changes* > > > >1. *Typesafe API:* Currently the stream processing API is not type-safe > >(Type info are erased by stream framework), all programming interfaces for > >developer are faced to Object/AnyRef, which is not very friendly and > >extensible for framework user (application developer): > > > >public void flatMap(java.util.List<*Object*> input, > >Collector<Tuple2<String, AlertAPIEntity>> outputCollector) > > > >So i propose the interface as (all type information are transparent for > >developer, not need additional parameters, supported by Scala implicit > >TypeTag) > > > >*class StreamProducer[+T <: Any] extends StreamInfo with > >StreamProtocol[T]* > > > > - *StreamInfo: * contains Stream core information including streamId, > > processing element id/name, entity type info (class/TypeTag) > > - *StreamProtocol extends JavaStreamProtocol*: contains basic Stream > >DSL > > API and java-compatible API > > > >class *StreamInfo* extends Serializable{ > > val id:Int = UniqueId.incrementAndGetId() > > var name: String = null > > var streamId:String=null > > var parallelismNum: Int = 1 > > var inKeyed:Boolean = false > > var outKeyed:Boolean = false > > var keySelector:KeySelector = null > > > > var typeClass:Class[_] = classOf[AnyRef] > > @transient implicit var *typeTag:ru.TypeTag[*_] = ru.typeTag[AnyRef] > >} > > > >2. *KeyValue Based Structure: *currently framework user (developer) have > >to > >handle with field declaration again and again, and framework and business > >logic are highly coupled, according to the StreamProtocol description, > >user > >should not care about framework level detail like internal data structure > >for Storm using List<Object> with Fields<String> which is not friendly for > >developer, we should make sure user focus on business logic only like: > > > >env.from(tuples) > >.groupByKey(_.name) > > > >3. *Spout grouping instead of overriding Schema*: currently especially in > >use case like HdfsAuditLog Monitoring, if developer wants to groupby > >certain key, they are forced to override Schema (specific for storm) , > >which is not good and un-reusable. > > > >4. *Environment Decoupled: *currently the stream (metadata) /dag (logic) / > >environment (execution) are coupled with storm internal implementation, > >which is not good for becoming a metadata-driven pipeline framework in > >future, so we should decouple it. > > > >*5. Compatible with Field-based Structure in old framework and > >application.* > > > >*Sample Application* > > > >case class Entity(name:String,value:Double,var inc:Int=0) > > > >val tuples = Seq( > >Entity("a", 1), > >Entity("a", 2), > >Entity("a", 3), > >Entity("b", 2), > >Entity("c", 3), > >Entity("d", 3) > >) > > > >val env = ExecutionEnvironments.get[StormExecutionEnvironment](args) > > > > > >*// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 * > > > >env.from(tuples) > >*.groupByKey(_.name)* > >.map(o => {o.inc += 2;o}) > >.filter(_.name != "b") > >.filter(_.name != "c") > >*.groupByKey(o=>(o.name <http://o.name>,o.value))* > >.map(o => (o.name,o)) > >.map(o => (o._1,o._2.value,o._2.inc)) > >.foreach(println) > > > > env.execute() > > > > > >*Type is transparent for developer during both DAG compiling (programming) > >and runtime (metadata) phases* > > > >2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before > >expanded DAG > >{ > >IterableStreamProducer[*Entity*]_1{1} -> > >GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup > >GroupByKeyProducer[<function1>(*Entity*)]_2{1} -> MapProducer[Entity]_3{1} > >in shuffleGroup > >MapProducer[*Entity]*_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup > >FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in > >shuffleGroup > >FilterProducer[*Entity*]_5{1} -> > >GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup > >GroupByKeyProducer[<function1>(*Entity*)]_6{1} -> MapProducer[Tuple2]_7{1} > >in shuffleGroup > >MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup > >MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup > >} > >2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After > >expanded > >DAG > >{ > >IterableStreamProducer[*Entity*]_1{1} -> MapProducer[Entity]_3{1} in > >groupByKey(<function1>) > >MapProducer[*Entity*]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup > >FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in > >shuffleGroup > >FilterProducer[*Entity*]_5{1} -> MapProducer[Tuple2]_7{1} in > >groupByKey(<function1>) > >MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup > >MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup > >} > >2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: > >Storm > >topology DAG > >{ > > Spout[IterableStreamProducer[*Entity*]_1]{1} -> > >Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>) > >Bolt[MapProducer[*Entity*]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in > >shuffleGroup > >Bolt[FilterProducer[*Entity*]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1} > >in shuffleGroup > >Bolt[FilterProducer[*Entity*]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in > >groupByKey(<function1>) > >Bolt[MapProducer[*Tuple2*]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in > >shuffleGroup > >Bolt[MapProducer[*Tuple3*]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in > >shuffleGroup > >} > > > > > >Regards, > >Hao > >
