[
https://issues.apache.org/jira/browse/EAGLE-66?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hao Chen updated EAGLE-66:
--------------------------
Description:
Main Features
1. Typesafe API: Currently the stream processing API is not type-safe (Type
info are erased by stream framework), all programming interfaces for developer
are faced to Object/AnyRef, which is not very friendly and extensible for
framework user (application developer):
{code}
public void flatMap(java.util.List<Object> input, Collector<Tuple2<String,
AlertAPIEntity>> outputCollector)
{code}
So i propose the interface as (all type information are transparent for
developer, not need additional parameters, supported by Scala implicit TypeTag)
{code}class StreamProducer[+T <: Any] extends StreamInfo with
StreamProtocol[T]{code}
* StreamInfo: contains Stream core information including streamId, processing
element id/name, entity type info (class/TypeTag)
* StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API and
java-compatible API
{code}
class StreamInfo extends Serializable{
val id:Int = UniqueId.incrementAndGetId()
var name: String = null
var streamId:String=null
var parallelismNum: Int = 1
var inKeyed:Boolean = false
var outKeyed:Boolean = false
var keySelector:KeySelector = null
var typeClass:Class[_] = classOf[AnyRef]
@transient implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
}
{code}
2. KeyValue Based Structure: currently framework user (developer) have to
handle with field declaration again and again, and framework and business logic
are highly coupled, according to the StreamProtocol description, user should
not care about framework level detail like internal data structure for Storm
using List<Object> with Fields<String> which is not friendly for developer, we
should make sure user focus on business logic only like:
{code}
env.from(tuples)
.groupByKey(_.name)
{code}
3. Spout grouping instead of overriding Schema: currently especially in use
case like HdfsAuditLog Monitoring, if developer wants to groupby certain key,
they are forced to override Schema (specific for storm) , which is not good and
un-reusable.
4. Environment Decoupled: currently the stream (metadata) /dag (logic) /
environment (execution) are coupled with storm internal implementation, which
is not good for becoming a metadata-driven pipeline framework in future, so we
should decouple it.
5. Compatible with Field-based Structure in old framework and application.
Sample Application
{code}
case class Entity(name:String,value:Double,var inc:Int=0)
val tuples = Seq(
Entity("a", 1),
Entity("a", 2),
Entity("a", 3),
Entity("b", 2),
Entity("c", 3),
Entity("d", 3)
)
val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3
env.from(tuples)
.groupByKey(_.name)
.map(o => {o.inc += 2;o})
.filter(_.name != "b")
.filter(_.name != "c")
.groupByKey(o=>(o.name,o.value))
.map(o => (o.name,o))
.map(o => (o._1,o._2.value,o._2.inc))
.foreach(println)
env.execute()
{code}
Type is transparent for developer during both DAG compiling (programming) and
runtime (metadata) phases
{code}
2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded
DAG
{
IterableStreamProducer[Entity]_1{1} ->
GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
GroupByKeyProducer[<function1>(Entity)]_2{1} ->
MapProducer[Entity]_3{1} in shuffleGroup
MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
shuffleGroup
FilterProducer[Entity]_5{1} ->
GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
GroupByKeyProducer[<function1>(Entity)]_6{1} ->
MapProducer[Tuple2]_7{1} in shuffleGroup
MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded DAG
{
IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in
groupByKey(<function1>)
MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
shuffleGroup
FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in
groupByKey(<function1>)
MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm
topology DAG
{
Spout[IterableStreamProducer[Entity]_1]{1} ->
Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in
shuffleGroup
Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1}
in shuffleGroup
Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in
groupByKey(<function1>)
Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in
shuffleGroup
Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in
shuffleGroup
}
{code}
was:
Main Features
1. Typesafe API: Currently the stream processing API is not type-safe (Type
info are erased by stream framework), all programming interfaces for developer
are faced to Object/AnyRef, which is not very friendly and extensible for
framework user (application developer):
public void flatMap(java.util.List<Object> input, Collector<Tuple2<String,
AlertAPIEntity>> outputCollector)
So i propose the interface as (all type information are transparent for
developer, not need additional parameters, supported by Scala implicit TypeTag)
class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T]
StreamInfo: contains Stream core information including streamId, processing
element id/name, entity type info (class/TypeTag)
StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API and
java-compatible API
class StreamInfo extends Serializable{
val id:Int = UniqueId.incrementAndGetId()
var name: String = null
var streamId:String=null
var parallelismNum: Int = 1
var inKeyed:Boolean = false
var outKeyed:Boolean = false
var keySelector:KeySelector = null
var typeClass:Class[_] = classOf[AnyRef]
@transient implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
}
2. KeyValue Based Structure: currently framework user (developer) have to
handle with field declaration again and again, and framework and business logic
are highly coupled, according to the StreamProtocol description, user should
not care about framework level detail like internal data structure for Storm
using List<Object> with Fields<String> which is not friendly for developer, we
should make sure user focus on business logic only like:
env.from(tuples)
.groupByKey(_.name)
3. Spout grouping instead of overriding Schema: currently especially in use
case like HdfsAuditLog Monitoring, if developer wants to groupby certain key,
they are forced to override Schema (specific for storm) , which is not good and
un-reusable.
4. Environment Decoupled: currently the stream (metadata) /dag (logic) /
environment (execution) are coupled with storm internal implementation, which
is not good for becoming a metadata-driven pipeline framework in future, so we
should decouple it.
5. Compatible with Field-based Structure in old framework and application.
Sample Application
case class Entity(name:String,value:Double,var inc:Int=0)
val tuples = Seq(
Entity("a", 1),
Entity("a", 2),
Entity("a", 3),
Entity("b", 2),
Entity("c", 3),
Entity("d", 3)
)
val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3
env.from(tuples)
.groupByKey(_.name)
.map(o => {o.inc += 2;o})
.filter(_.name != "b")
.filter(_.name != "c")
.groupByKey(o=>(o.name,o.value))
.map(o => (o.name,o))
.map(o => (o._1,o._2.value,o._2.inc))
.foreach(println)
env.execute()
Type is transparent for developer during both DAG compiling (programming) and
runtime (metadata) phases
2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded
DAG
{
IterableStreamProducer[Entity]_1{1} ->
GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
GroupByKeyProducer[<function1>(Entity)]_2{1} ->
MapProducer[Entity]_3{1} in shuffleGroup
MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
shuffleGroup
FilterProducer[Entity]_5{1} ->
GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
GroupByKeyProducer[<function1>(Entity)]_6{1} ->
MapProducer[Tuple2]_7{1} in shuffleGroup
MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded DAG
{
IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in
groupByKey(<function1>)
MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
shuffleGroup
FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in
groupByKey(<function1>)
MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm
topology DAG
{
Spout[IterableStreamProducer[Entity]_1]{1} ->
Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in
shuffleGroup
Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1}
in shuffleGroup
Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in
groupByKey(<function1>)
Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in
shuffleGroup
Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in
shuffleGroup
}
> Eagle TypeSafe Stream Processing DSL
> ------------------------------------
>
> Key: EAGLE-66
> URL: https://issues.apache.org/jira/browse/EAGLE-66
> Project: Eagle
> Issue Type: Improvement
> Affects Versions: 0.3.0
> Reporter: Hao Chen
> Assignee: Hao Chen
> Fix For: 0.3.0
>
>
> Main Features
> 1. Typesafe API: Currently the stream processing API is not type-safe (Type
> info are erased by stream framework), all programming interfaces for
> developer are faced to Object/AnyRef, which is not very friendly and
> extensible for framework user (application developer):
> {code}
> public void flatMap(java.util.List<Object> input, Collector<Tuple2<String,
> AlertAPIEntity>> outputCollector)
> {code}
> So i propose the interface as (all type information are transparent for
> developer, not need additional parameters, supported by Scala implicit
> TypeTag)
> {code}class StreamProducer[+T <: Any] extends StreamInfo with
> StreamProtocol[T]{code}
> * StreamInfo: contains Stream core information including streamId,
> processing element id/name, entity type info (class/TypeTag)
> * StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API
> and java-compatible API
> {code}
> class StreamInfo extends Serializable{
> val id:Int = UniqueId.incrementAndGetId()
> var name: String = null
> var streamId:String=null
> var parallelismNum: Int = 1
> var inKeyed:Boolean = false
> var outKeyed:Boolean = false
> var keySelector:KeySelector = null
> var typeClass:Class[_] = classOf[AnyRef]
> @transient implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
> }
> {code}
> 2. KeyValue Based Structure: currently framework user (developer) have to
> handle with field declaration again and again, and framework and business
> logic are highly coupled, according to the StreamProtocol description, user
> should not care about framework level detail like internal data structure for
> Storm using List<Object> with Fields<String> which is not friendly for
> developer, we should make sure user focus on business logic only like:
> {code}
> env.from(tuples)
> .groupByKey(_.name)
> {code}
> 3. Spout grouping instead of overriding Schema: currently especially in use
> case like HdfsAuditLog Monitoring, if developer wants to groupby certain
> key, they are forced to override Schema (specific for storm) , which is not
> good and un-reusable.
> 4. Environment Decoupled: currently the stream (metadata) /dag (logic) /
> environment (execution) are coupled with storm internal implementation, which
> is not good for becoming a metadata-driven pipeline framework in future, so
> we should decouple it.
>
> 5. Compatible with Field-based Structure in old framework and application.
> Sample Application
> {code}
> case class Entity(name:String,value:Double,var inc:Int=0)
> val tuples = Seq(
> Entity("a", 1),
> Entity("a", 2),
> Entity("a", 3),
> Entity("b", 2),
> Entity("c", 3),
> Entity("d", 3)
> )
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
> // DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3
> env.from(tuples)
> .groupByKey(_.name)
> .map(o => {o.inc += 2;o})
> .filter(_.name != "b")
> .filter(_.name != "c")
> .groupByKey(o=>(o.name,o.value))
> .map(o => (o.name,o))
> .map(o => (o._1,o._2.value,o._2.inc))
> .foreach(println)
> env.execute()
> {code}
> Type is transparent for developer during both DAG compiling (programming) and
> runtime (metadata) phases
> {code}
> 2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded
> DAG
> {
> IterableStreamProducer[Entity]_1{1} ->
> GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
> GroupByKeyProducer[<function1>(Entity)]_2{1} ->
> MapProducer[Entity]_3{1} in shuffleGroup
> MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
> FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
> shuffleGroup
> FilterProducer[Entity]_5{1} ->
> GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
> GroupByKeyProducer[<function1>(Entity)]_6{1} ->
> MapProducer[Tuple2]_7{1} in shuffleGroup
> MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
> MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded
> DAG
> {
> IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in
> groupByKey(<function1>)
> MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
> FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
> shuffleGroup
> FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in
> groupByKey(<function1>)
> MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
> MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm
> topology DAG
> {
> Spout[IterableStreamProducer[Entity]_1]{1} ->
> Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
> Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in
> shuffleGroup
> Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1}
> in shuffleGroup
> Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in
> groupByKey(<function1>)
> Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in
> shuffleGroup
> Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in
> shuffleGroup
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)