Edward,

Type-safe API currently is mainly for better development experience, not
fully general-purpose monitoring.

For example, when the developer is required to implement groupBy  "user",
the implementation have to override lots of storm-specified code like:

https://github.com/haoch/incubator-eagle/blob/master/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java#L64-L82

, which is not very good at all.

The better way should be like:
https://github.com/haoch/incubator-eagle/blob/EAGLE-66/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala#L83-L86

  env.fromSpout[String](TestSpout()) // Declare stream type if not given
    .groupByKey(_.charAt(0))  // Especially in this line, pass into
type-based key generator
    .foreach(println)
  env.execute()

As a general purpose monitoring, especially for advanced monitoring case,
at the first step I think we should provide such kinds of end-user-oriented
API, so that they could customize the logic easily.

Another benefit is that after separate the API into: Stream = SteamInfo
(Type)  + StreamProtocol (Connector)

We could more easily declare the DAG in pure metadata

2015-12-14 17:53:49,891 INFO [main] utils.GraphPrinter$[56]: Print DOT
digraph (copy and visualize with http://www.webgraphviz.com/)

digraph dag {
"kafkaMsgConsumer x 1" -> "HdfsUserCommandReassembler_3 x 1" [label =
"groupByStrategy( org.apache.eagle.partition.PartitionStrategyImpl@43eb13dc
)"];
"kafkaMsgConsumer x 1" -> "FileSensitivityDataJoinExecutor_6 x 1" [label =
"groupByStrategy( org.apache.eagle.partition.PartitionStrategyImpl@43eb13dc
)"];
"HdfsUserCommandReassembler_3 x 1" -> "FileSensitivityDataJoinExecutor_6 x
1" [label = "groupByFields( Buffer(0) )"];
"FileSensitivityDataJoinExecutor_6 x 1" ->
"JavaStormExecutorForAlertWrapper_10 x 1" [label = "groupByFields(
Buffer(0) )"];
"JavaStormExecutorForAlertWrapper_10 x 1" -> "hdfsAuditLogAlertExecutor_0 x
1" [label = "groupByFields( List(0) )"];
"hdfsAuditLogAlertExecutor_0 x 1" -> "AlertEntityDeduplicationExecutor_12 x
1" [label = "shuffleGroup"];
"hdfsAuditLogAlertExecutor_0 x 1" -> "AlertEmailDeduplicationExecutor_14 x
1" [label = "shuffleGroup"];
"AlertEntityDeduplicationExecutor_12 x 1" -> "AlertPersistExecutor_13 x 1"
[label = "shuffleGroup"];
"AlertEmailDeduplicationExecutor_14 x 1" -> "AlertNotificationExecutor_15 x
1" [label = "shuffleGroup"];
}

The next step to implement general monitoring is just  doing reverse
engineering.

Regards,
Hao

On Mon, Dec 14, 2015 at 1:20 PM, Zhang, Edward (GDI Hadoop) <
[email protected]> wrote:

> This is a nice progress. good to see we have type safe fluent API.
>
> Could you please explain a little bit more how typesafe would help generic
> purpose monitoring?
>
> In general purpose monitoring, do we need input type/output type defined?
> How user describe their types from source to each processing element, and
> how framework interpret user declared type into EAGLE internal type?
>
> Thanks
> Edward
>
> On 12/6/15, 23:47, "Hao Chen" <[email protected]> wrote:
>
> >JIRA: https://issues.apache.org/jira/browse/EAGLE-66
> >Pull Request: https://github.com/apache/incubator-eagle/pull/17
> >
> >*Main Changes*
> >
> >1. *Typesafe API:* Currently the stream processing API is not type-safe
> >(Type info are erased by stream framework), all programming interfaces for
> >developer are faced to Object/AnyRef, which is not very friendly and
> >extensible for framework user (application developer):
> >
> >public void flatMap(java.util.List<*Object*> input,
> >Collector<Tuple2<String, AlertAPIEntity>> outputCollector)
> >
> >So i propose the interface as (all type information are transparent for
> >developer, not need additional parameters, supported by Scala implicit
> >TypeTag)
> >
> >*class StreamProducer[+T <: Any] extends StreamInfo with
> >StreamProtocol[T]*
> >
> >   - *StreamInfo: * contains Stream core information including streamId,
> >   processing element id/name, entity type info (class/TypeTag)
> >   - *StreamProtocol extends JavaStreamProtocol*: contains basic Stream
> >DSL
> >   API and java-compatible API
> >
> >class *StreamInfo*  extends Serializable{
> >  val id:Int = UniqueId.incrementAndGetId()
> >  var name: String = null
> >  var streamId:String=null
> >  var parallelismNum: Int = 1
> >  var inKeyed:Boolean = false
> >  var outKeyed:Boolean = false
> >  var keySelector:KeySelector = null
> >
> >  var typeClass:Class[_] = classOf[AnyRef]
> >  @transient  implicit var *typeTag:ru.TypeTag[*_] = ru.typeTag[AnyRef]
> >}
> >
> >2. *KeyValue Based Structure: *currently framework user (developer) have
> >to
> >handle with field declaration again and again, and framework and business
> >logic are highly coupled, according to the StreamProtocol description,
> >user
> >should not care about framework level detail like internal data structure
> >for Storm using List<Object> with Fields<String> which is not friendly for
> >developer, we should make sure user focus on business logic only like:
> >
> >env.from(tuples)
> >.groupByKey(_.name)
> >
> >3. *Spout grouping instead of overriding Schema*: currently especially in
> >use case like HdfsAuditLog Monitoring, if developer wants to  groupby
> >certain key, they are forced to override Schema (specific for storm) ,
> >which is not good and un-reusable.
> >
> >4. *Environment Decoupled: *currently the stream (metadata) /dag (logic) /
> >environment (execution) are coupled with storm internal implementation,
> >which is not good for becoming a metadata-driven pipeline framework in
> >future, so we should decouple it.
> >
> >*5. Compatible with Field-based Structure in old framework and
> >application.*
> >
> >*Sample Application*
> >
> >case class Entity(name:String,value:Double,var inc:Int=0)
> >
> >val tuples = Seq(
> >Entity("a", 1),
> >Entity("a", 2),
> >Entity("a", 3),
> >Entity("b", 2),
> >Entity("c", 3),
> >Entity("d", 3)
> >)
> >
> >val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
> >
> >
> >*// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 *
> >
> >env.from(tuples)
> >*.groupByKey(_.name)*
> >.map(o => {o.inc += 2;o})
> >.filter(_.name != "b")
> >.filter(_.name != "c")
> >*.groupByKey(o=>(o.name <http://o.name>,o.value))*
> >.map(o => (o.name,o))
> >.map(o => (o._1,o._2.value,o._2.inc))
> >.foreach(println)
> >
> > env.execute()
> >
> >
> >*Type is transparent for developer during both DAG compiling (programming)
> >and runtime (metadata) phases*
> >
> >2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before
> >expanded DAG
> >{
> >IterableStreamProducer[*Entity*]_1{1} ->
> >GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
> >GroupByKeyProducer[<function1>(*Entity*)]_2{1} -> MapProducer[Entity]_3{1}
> >in shuffleGroup
> >MapProducer[*Entity]*_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
> >FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in
> >shuffleGroup
> >FilterProducer[*Entity*]_5{1} ->
> >GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
> >GroupByKeyProducer[<function1>(*Entity*)]_6{1} -> MapProducer[Tuple2]_7{1}
> >in shuffleGroup
> >MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
> >MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> >}
> >2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After
> >expanded
> >DAG
> >{
> >IterableStreamProducer[*Entity*]_1{1} -> MapProducer[Entity]_3{1} in
> >groupByKey(<function1>)
> >MapProducer[*Entity*]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
> >FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in
> >shuffleGroup
> >FilterProducer[*Entity*]_5{1} -> MapProducer[Tuple2]_7{1} in
> >groupByKey(<function1>)
> >MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
> >MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> >}
> >2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]:
> >Storm
> >topology DAG
> >{
> >  Spout[IterableStreamProducer[*Entity*]_1]{1} ->
> >Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
> >Bolt[MapProducer[*Entity*]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in
> >shuffleGroup
> >Bolt[FilterProducer[*Entity*]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1}
> >in shuffleGroup
> >Bolt[FilterProducer[*Entity*]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in
> >groupByKey(<function1>)
> >Bolt[MapProducer[*Tuple2*]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in
> >shuffleGroup
> >Bolt[MapProducer[*Tuple3*]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in
> >shuffleGroup
> >}
> >
> >
> >Regards,
> >Hao
>
>

Reply via email to