[ 
https://issues.apache.org/jira/browse/EAGLE-66?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Chen updated EAGLE-66:
--------------------------
    Description: 
h1. Main Features

1. Typesafe API: Currently the stream processing API is not type-safe (Type 
info are erased by stream framework), all programming interfaces for developer 
are faced to Object/AnyRef, which is not very friendly and extensible for 
framework user (application developer):

{code}
public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, 
AlertAPIEntity>> outputCollector)
{code}

So i propose the interface as (all type information are transparent for 
developer, not need additional parameters, supported by Scala implicit TypeTag)
{code}class StreamProducer[+T <: Any] extends StreamInfo with 
StreamProtocol[T]{code}

* StreamInfo:  contains Stream core information including streamId, processing 
element id/name, entity type info (class/TypeTag)
* StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API and 
java-compatible API

{code}
class StreamInfo  extends Serializable{
  val id:Int = UniqueId.incrementAndGetId()
  var name: String = null
  var streamId:String=null
  var parallelismNum: Int = 1
  var inKeyed:Boolean = false
  var outKeyed:Boolean = false
  var keySelector:KeySelector = null

  var typeClass:Class[_] = classOf[AnyRef]
  @transient  implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
}
{code}

And the StreamInfo can be shared through the runtime as implicit context for 
execution layer as well:

{code}
abstract class AbstractStreamBolt[T](val fieldsNum:Int=0, val ack:Boolean = 
true)(implicit streamInfo:StreamInfo) extends BaseRichBolt
{code}

2. KeyValue Based Structure: currently framework user (developer) have to 
handle with field declaration again and again, and framework and business logic 
are highly coupled, according to the StreamProtocol description, user should 
not care about framework level detail like internal data structure for Storm 
using List<Object> with Fields<String> which is not friendly for developer, we 
should make sure user focus on business logic only like:

{code}
env.from(tuples)
        .groupByKey(_.name)
{code}

3. Spout grouping instead of overriding Schema: currently especially in use 
case like HdfsAuditLog Monitoring, if developer wants to  groupby certain key, 
they are forced to override Schema (specific for storm) , which is not good and 
un-reusable.

4. Environment Decoupled: currently the stream (metadata) /dag (logic) / 
environment (execution) are coupled with storm internal implementation, which 
is not good for becoming a metadata-driven pipeline framework in future, so we 
should decouple it.
 
5. Compatible with Field-based Structure in old framework and application.

6. Configuration: enhanced config wrapper upon typesafe-config for supporting 
get/set/default and integrated with ExecutionEnvironment

{code}
val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
  val streamName = env.config.get[String]("eagle.stream.name","eventStream")
  val streamExecutorId = 
env.config.get[String]("eagle.stream.executor",s"${streamName}Executor")
  
env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
{code}

h1. Sample Application

{code}
case class Entity(name:String,value:Double,var inc:Int=0)

val tuples = Seq(
        Entity("a", 1),
        Entity("a", 2),
        Entity("a", 3),
        Entity("b", 2),
        Entity("c", 3),
        Entity("d", 3)
)

val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)


// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 

env.from(tuples)
        .groupByKey(_.name)
        .map(o => {o.inc += 2;o})
        .filter(_.name != "b")
        .filter(_.name != "c")
        .groupByKey(o=>(o.name,o.value))
        .map(o => (o.name,o))
        .map(o => (o._1,o._2.value,o._2.inc))
        .foreach(println)

 env.execute()
{code}

Type is transparent for developer during both DAG compiling (programming) and 
runtime (metadata) phases

{code}
2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded 
DAG 
{ 
        IterableStreamProducer[Entity]_1{1} -> 
GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
        GroupByKeyProducer[<function1>(Entity)]_2{1} -> 
MapProducer[Entity]_3{1} in shuffleGroup
        MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
        FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
shuffleGroup
        FilterProducer[Entity]_5{1} -> 
GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
        GroupByKeyProducer[<function1>(Entity)]_6{1} -> 
MapProducer[Tuple2]_7{1} in shuffleGroup
        MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
        MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded DAG 
{ 
        IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in 
groupByKey(<function1>)
        MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
        FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
shuffleGroup
        FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in 
groupByKey(<function1>)
        MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
        MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm 
topology DAG
{
        Spout[IterableStreamProducer[Entity]_1]{1} -> 
Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
        Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in 
shuffleGroup
        Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1} 
in shuffleGroup
        Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in 
groupByKey(<function1>)
        Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in 
shuffleGroup
        Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in 
shuffleGroup 
}
{code}

  was:
h1. Main Features

1. Typesafe API: Currently the stream processing API is not type-safe (Type 
info are erased by stream framework), all programming interfaces for developer 
are faced to Object/AnyRef, which is not very friendly and extensible for 
framework user (application developer):

{code}
public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, 
AlertAPIEntity>> outputCollector)
{code}

So i propose the interface as (all type information are transparent for 
developer, not need additional parameters, supported by Scala implicit TypeTag)
{code}class StreamProducer[+T <: Any] extends StreamInfo with 
StreamProtocol[T]{code}

* StreamInfo:  contains Stream core information including streamId, processing 
element id/name, entity type info (class/TypeTag)
* StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API and 
java-compatible API

{code}
class StreamInfo  extends Serializable{
  val id:Int = UniqueId.incrementAndGetId()
  var name: String = null
  var streamId:String=null
  var parallelismNum: Int = 1
  var inKeyed:Boolean = false
  var outKeyed:Boolean = false
  var keySelector:KeySelector = null

  var typeClass:Class[_] = classOf[AnyRef]
  @transient  implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
}
{code}

2. KeyValue Based Structure: currently framework user (developer) have to 
handle with field declaration again and again, and framework and business logic 
are highly coupled, according to the StreamProtocol description, user should 
not care about framework level detail like internal data structure for Storm 
using List<Object> with Fields<String> which is not friendly for developer, we 
should make sure user focus on business logic only like:

{code}
env.from(tuples)
        .groupByKey(_.name)
{code}

3. Spout grouping instead of overriding Schema: currently especially in use 
case like HdfsAuditLog Monitoring, if developer wants to  groupby certain key, 
they are forced to override Schema (specific for storm) , which is not good and 
un-reusable.

4. Environment Decoupled: currently the stream (metadata) /dag (logic) / 
environment (execution) are coupled with storm internal implementation, which 
is not good for becoming a metadata-driven pipeline framework in future, so we 
should decouple it.
 
5. Compatible with Field-based Structure in old framework and application.

6. Configuration: enhanced config wrapper upon typesafe-config for supporting 
get/set/default and integrated with ExecutionEnvironment

{code}
val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
  val streamName = env.config.get[String]("eagle.stream.name","eventStream")
  val streamExecutorId = 
env.config.get[String]("eagle.stream.executor",s"${streamName}Executor")
  
env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
{code}

h1. Sample Application

{code}
case class Entity(name:String,value:Double,var inc:Int=0)

val tuples = Seq(
        Entity("a", 1),
        Entity("a", 2),
        Entity("a", 3),
        Entity("b", 2),
        Entity("c", 3),
        Entity("d", 3)
)

val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)


// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 

env.from(tuples)
        .groupByKey(_.name)
        .map(o => {o.inc += 2;o})
        .filter(_.name != "b")
        .filter(_.name != "c")
        .groupByKey(o=>(o.name,o.value))
        .map(o => (o.name,o))
        .map(o => (o._1,o._2.value,o._2.inc))
        .foreach(println)

 env.execute()
{code}

Type is transparent for developer during both DAG compiling (programming) and 
runtime (metadata) phases

{code}
2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded 
DAG 
{ 
        IterableStreamProducer[Entity]_1{1} -> 
GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
        GroupByKeyProducer[<function1>(Entity)]_2{1} -> 
MapProducer[Entity]_3{1} in shuffleGroup
        MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
        FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
shuffleGroup
        FilterProducer[Entity]_5{1} -> 
GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
        GroupByKeyProducer[<function1>(Entity)]_6{1} -> 
MapProducer[Tuple2]_7{1} in shuffleGroup
        MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
        MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded DAG 
{ 
        IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in 
groupByKey(<function1>)
        MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
        FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
shuffleGroup
        FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in 
groupByKey(<function1>)
        MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
        MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm 
topology DAG
{
        Spout[IterableStreamProducer[Entity]_1]{1} -> 
Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
        Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in 
shuffleGroup
        Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1} 
in shuffleGroup
        Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in 
groupByKey(<function1>)
        Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in 
shuffleGroup
        Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in 
shuffleGroup 
}
{code}


> Eagle TypeSafe Stream Processing DSL
> ------------------------------------
>
>                 Key: EAGLE-66
>                 URL: https://issues.apache.org/jira/browse/EAGLE-66
>             Project: Eagle
>          Issue Type: Improvement
>    Affects Versions: 0.3.0
>            Reporter: Hao Chen
>            Assignee: Hao Chen
>             Fix For: 0.3.0
>
>
> h1. Main Features
> 1. Typesafe API: Currently the stream processing API is not type-safe (Type 
> info are erased by stream framework), all programming interfaces for 
> developer are faced to Object/AnyRef, which is not very friendly and 
> extensible for framework user (application developer):
> {code}
> public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, 
> AlertAPIEntity>> outputCollector)
> {code}
> So i propose the interface as (all type information are transparent for 
> developer, not need additional parameters, supported by Scala implicit 
> TypeTag)
> {code}class StreamProducer[+T <: Any] extends StreamInfo with 
> StreamProtocol[T]{code}
> * StreamInfo:  contains Stream core information including streamId, 
> processing element id/name, entity type info (class/TypeTag)
> * StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API 
> and java-compatible API
> {code}
> class StreamInfo  extends Serializable{
>   val id:Int = UniqueId.incrementAndGetId()
>   var name: String = null
>   var streamId:String=null
>   var parallelismNum: Int = 1
>   var inKeyed:Boolean = false
>   var outKeyed:Boolean = false
>   var keySelector:KeySelector = null
>   var typeClass:Class[_] = classOf[AnyRef]
>   @transient  implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
> }
> {code}
> And the StreamInfo can be shared through the runtime as implicit context for 
> execution layer as well:
> {code}
> abstract class AbstractStreamBolt[T](val fieldsNum:Int=0, val ack:Boolean = 
> true)(implicit streamInfo:StreamInfo) extends BaseRichBolt
> {code}
> 2. KeyValue Based Structure: currently framework user (developer) have to 
> handle with field declaration again and again, and framework and business 
> logic are highly coupled, according to the StreamProtocol description, user 
> should not care about framework level detail like internal data structure for 
> Storm using List<Object> with Fields<String> which is not friendly for 
> developer, we should make sure user focus on business logic only like:
> {code}
> env.from(tuples)
>       .groupByKey(_.name)
> {code}
> 3. Spout grouping instead of overriding Schema: currently especially in use 
> case like HdfsAuditLog Monitoring, if developer wants to  groupby certain 
> key, they are forced to override Schema (specific for storm) , which is not 
> good and un-reusable.
> 4. Environment Decoupled: currently the stream (metadata) /dag (logic) / 
> environment (execution) are coupled with storm internal implementation, which 
> is not good for becoming a metadata-driven pipeline framework in future, so 
> we should decouple it.
>  
> 5. Compatible with Field-based Structure in old framework and application.
> 6. Configuration: enhanced config wrapper upon typesafe-config for supporting 
> get/set/default and integrated with ExecutionEnvironment
> {code}
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
>   val streamName = env.config.get[String]("eagle.stream.name","eventStream")
>   val streamExecutorId = 
> env.config.get[String]("eagle.stream.executor",s"${streamName}Executor")
>   
> env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
> {code}
> h1. Sample Application
> {code}
> case class Entity(name:String,value:Double,var inc:Int=0)
> val tuples = Seq(
>       Entity("a", 1),
>       Entity("a", 2),
>       Entity("a", 3),
>       Entity("b", 2),
>       Entity("c", 3),
>       Entity("d", 3)
> )
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
> // DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 
> env.from(tuples)
>       .groupByKey(_.name)
>       .map(o => {o.inc += 2;o})
>       .filter(_.name != "b")
>       .filter(_.name != "c")
>       .groupByKey(o=>(o.name,o.value))
>       .map(o => (o.name,o))
>       .map(o => (o._1,o._2.value,o._2.inc))
>       .foreach(println)
>  env.execute()
> {code}
> Type is transparent for developer during both DAG compiling (programming) and 
> runtime (metadata) phases
> {code}
> 2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded 
> DAG 
> { 
>       IterableStreamProducer[Entity]_1{1} -> 
> GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
>       GroupByKeyProducer[<function1>(Entity)]_2{1} -> 
> MapProducer[Entity]_3{1} in shuffleGroup
>       MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
>       FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
> shuffleGroup
>       FilterProducer[Entity]_5{1} -> 
> GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
>       GroupByKeyProducer[<function1>(Entity)]_6{1} -> 
> MapProducer[Tuple2]_7{1} in shuffleGroup
>       MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
>       MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded 
> DAG 
> { 
>       IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in 
> groupByKey(<function1>)
>       MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
>       FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in 
> shuffleGroup
>       FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in 
> groupByKey(<function1>)
>       MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
>       MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm 
> topology DAG
> {
>       Spout[IterableStreamProducer[Entity]_1]{1} -> 
> Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
>       Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in 
> shuffleGroup
>       Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1} 
> in shuffleGroup
>       Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in 
> groupByKey(<function1>)
>       Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in 
> shuffleGroup
>       Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in 
> shuffleGroup 
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to