[
https://issues.apache.org/jira/browse/EAGLE-66?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hao Chen updated EAGLE-66:
--------------------------
Description:
Main Features
1. Typesafe API: Currently the stream processing API is not type-safe (Type
info are erased by stream framework), all programming interfaces for developer
are faced to Object/AnyRef, which is not very friendly and extensible for
framework user (application developer):
public void flatMap(java.util.List<Object> input, Collector<Tuple2<String,
AlertAPIEntity>> outputCollector)
So i propose the interface as (all type information are transparent for
developer, not need additional parameters, supported by Scala implicit TypeTag)
class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T]
StreamInfo: contains Stream core information including streamId, processing
element id/name, entity type info (class/TypeTag)
StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API and
java-compatible API
class StreamInfo extends Serializable{
val id:Int = UniqueId.incrementAndGetId()
var name: String = null
var streamId:String=null
var parallelismNum: Int = 1
var inKeyed:Boolean = false
var outKeyed:Boolean = false
var keySelector:KeySelector = null
var typeClass:Class[_] = classOf[AnyRef]
@transient implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
}
2. KeyValue Based Structure: currently framework user (developer) have to
handle with field declaration again and again, and framework and business logic
are highly coupled, according to the StreamProtocol description, user should
not care about framework level detail like internal data structure for Storm
using List<Object> with Fields<String> which is not friendly for developer, we
should make sure user focus on business logic only like:
env.from(tuples)
.groupByKey(_.name)
3. Spout grouping instead of overriding Schema: currently especially in use
case like HdfsAuditLog Monitoring, if developer wants to groupby certain key,
they are forced to override Schema (specific for storm) , which is not good and
un-reusable.
4. Environment Decoupled: currently the stream (metadata) /dag (logic) /
environment (execution) are coupled with storm internal implementation, which
is not good for becoming a metadata-driven pipeline framework in future, so we
should decouple it.
5. Compatible with Field-based Structure in old framework and application.
Sample Application
case class Entity(name:String,value:Double,var inc:Int=0)
val tuples = Seq(
Entity("a", 1),
Entity("a", 2),
Entity("a", 3),
Entity("b", 2),
Entity("c", 3),
Entity("d", 3)
)
val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3
env.from(tuples)
.groupByKey(_.name)
.map(o => {o.inc += 2;o})
.filter(_.name != "b")
.filter(_.name != "c")
.groupByKey(o=>(o.name,o.value))
.map(o => (o.name,o))
.map(o => (o._1,o._2.value,o._2.inc))
.foreach(println)
env.execute()
Type is transparent for developer during both DAG compiling (programming) and
runtime (metadata) phases
2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded
DAG
{
IterableStreamProducer[Entity]_1{1} ->
GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
GroupByKeyProducer[<function1>(Entity)]_2{1} ->
MapProducer[Entity]_3{1} in shuffleGroup
MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
shuffleGroup
FilterProducer[Entity]_5{1} ->
GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
GroupByKeyProducer[<function1>(Entity)]_6{1} ->
MapProducer[Tuple2]_7{1} in shuffleGroup
MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded DAG
{
IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in
groupByKey(<function1>)
MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
shuffleGroup
FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in
groupByKey(<function1>)
MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm
topology DAG
{
Spout[IterableStreamProducer[Entity]_1]{1} ->
Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in
shuffleGroup
Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1}
in shuffleGroup
Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in
groupByKey(<function1>)
Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in
shuffleGroup
Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in
shuffleGroup
}
was:
Firstly, as to application developer space, developer should not need to care
about exchanging internal data structure between processing elements, so that
it should allow user to process the stream just like processing a collection of
typed object, for example in following case, we should allow to:
In type-safe way: groupBy(_.user)
In field-base way: groupBy("user”)
@StreamDef("hdfsAuditLogStream")
case class AuditLog(timestamp:Long,user:String,path:String,cmd:String)
object TypedSafeApp extends App{
val ec = ExecutionContext.get[StormExecutionContext]
ec.fromKafka(ec.getConfig){ bytes =>
AuditLog(1000,"someone","/tmp/private","open")}.parallism(123)//
Stream[AuditLog]
.groupBy(_.user).parallism(12)
// Stream[AuditLog]
.map { obj => (obj.user,obj)}// Stream[(String, AuditLog)]
.groupBy(_._1)// Stream[(String, AuditLog)]
.map(_._2)// Stream[AuditLog]
.alert.persistAndEmail// Stream[AlertAPIEntity]
}
As to framework space, especially for groupBy semantics, it should look like
nothing different as to developer, but in internal space, the exchange
structure is (AnyRef, AuditLog)
As to groupBy(_.user) and object: obj in type of AuditLog, it’s always
(obj.user,obj) + fieldsGrouping by 0 for storm
As to groupBy(“user") and object: obj in type of AuditLog, it’s
(readValueByReflect(“user”,obj),obj) + fieldsGrouping by 0 for storm
> Eagle TypeSafe Stream Processing DSL
> ------------------------------------
>
> Key: EAGLE-66
> URL: https://issues.apache.org/jira/browse/EAGLE-66
> Project: Eagle
> Issue Type: Improvement
> Affects Versions: 0.3.0
> Reporter: Hao Chen
> Assignee: Hao Chen
> Fix For: 0.3.0
>
>
> Main Features
> 1. Typesafe API: Currently the stream processing API is not type-safe (Type
> info are erased by stream framework), all programming interfaces for
> developer are faced to Object/AnyRef, which is not very friendly and
> extensible for framework user (application developer):
> public void flatMap(java.util.List<Object> input, Collector<Tuple2<String,
> AlertAPIEntity>> outputCollector)
> So i propose the interface as (all type information are transparent for
> developer, not need additional parameters, supported by Scala implicit
> TypeTag)
> class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T]
> StreamInfo: contains Stream core information including streamId, processing
> element id/name, entity type info (class/TypeTag)
> StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API and
> java-compatible API
> class StreamInfo extends Serializable{
> val id:Int = UniqueId.incrementAndGetId()
> var name: String = null
> var streamId:String=null
> var parallelismNum: Int = 1
> var inKeyed:Boolean = false
> var outKeyed:Boolean = false
> var keySelector:KeySelector = null
> var typeClass:Class[_] = classOf[AnyRef]
> @transient implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
> }
> 2. KeyValue Based Structure: currently framework user (developer) have to
> handle with field declaration again and again, and framework and business
> logic are highly coupled, according to the StreamProtocol description, user
> should not care about framework level detail like internal data structure for
> Storm using List<Object> with Fields<String> which is not friendly for
> developer, we should make sure user focus on business logic only like:
> env.from(tuples)
> .groupByKey(_.name)
> 3. Spout grouping instead of overriding Schema: currently especially in use
> case like HdfsAuditLog Monitoring, if developer wants to groupby certain
> key, they are forced to override Schema (specific for storm) , which is not
> good and un-reusable.
> 4. Environment Decoupled: currently the stream (metadata) /dag (logic) /
> environment (execution) are coupled with storm internal implementation, which
> is not good for becoming a metadata-driven pipeline framework in future, so
> we should decouple it.
>
> 5. Compatible with Field-based Structure in old framework and application.
> Sample Application
> case class Entity(name:String,value:Double,var inc:Int=0)
> val tuples = Seq(
> Entity("a", 1),
> Entity("a", 2),
> Entity("a", 3),
> Entity("b", 2),
> Entity("c", 3),
> Entity("d", 3)
> )
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
> // DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3
> env.from(tuples)
> .groupByKey(_.name)
> .map(o => {o.inc += 2;o})
> .filter(_.name != "b")
> .filter(_.name != "c")
> .groupByKey(o=>(o.name,o.value))
> .map(o => (o.name,o))
> .map(o => (o._1,o._2.value,o._2.inc))
> .foreach(println)
> env.execute()
> Type is transparent for developer during both DAG compiling (programming) and
> runtime (metadata) phases
> 2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded
> DAG
> {
> IterableStreamProducer[Entity]_1{1} ->
> GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
> GroupByKeyProducer[<function1>(Entity)]_2{1} ->
> MapProducer[Entity]_3{1} in shuffleGroup
> MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
> FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
> shuffleGroup
> FilterProducer[Entity]_5{1} ->
> GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
> GroupByKeyProducer[<function1>(Entity)]_6{1} ->
> MapProducer[Tuple2]_7{1} in shuffleGroup
> MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
> MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded
> DAG
> {
> IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in
> groupByKey(<function1>)
> MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
> FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
> shuffleGroup
> FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in
> groupByKey(<function1>)
> MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
> MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm
> topology DAG
> {
> Spout[IterableStreamProducer[Entity]_1]{1} ->
> Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
> Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in
> shuffleGroup
> Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1}
> in shuffleGroup
> Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in
> groupByKey(<function1>)
> Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in
> shuffleGroup
> Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in
> shuffleGroup
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)