[
https://issues.apache.org/jira/browse/EAGLE-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15055717#comment-15055717
]
ASF GitHub Bot commented on EAGLE-66:
-------------------------------------
Github user RalphSu commented on a diff in the pull request:
https://github.com/apache/incubator-eagle/pull/26#discussion_r47477417
--- Diff:
eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestJavaReflectionUtils.java
---
@@ -0,0 +1,19 @@
+package org.apache.eagle.datastream;
--- End diff --
+1 for license missing
> Eagle TypeSafe Stream Processing DSL
> ------------------------------------
>
> Key: EAGLE-66
> URL: https://issues.apache.org/jira/browse/EAGLE-66
> Project: Eagle
> Issue Type: Improvement
> Affects Versions: 0.3.0
> Reporter: Hao Chen
> Assignee: Hao Chen
> Fix For: 0.3.0
>
>
> h1. Main Features
> 1. Typesafe API: Currently the stream processing API is not type-safe (Type
> info are erased by stream framework), all programming interfaces for
> developer are faced to Object/AnyRef, which is not very friendly and
> extensible for framework user (application developer):
> {code}
> public void flatMap(java.util.List<Object> input, Collector<Tuple2<String,
> AlertAPIEntity>> outputCollector)
> {code}
> So i propose the interface as (all type information are transparent for
> developer, not need additional parameters, supported by Scala implicit
> TypeTag)
> {code}class StreamProducer[+T <: Any] extends StreamInfo with
> StreamProtocol[T]{code}
> * StreamInfo: contains Stream core information including streamId,
> processing element id/name, entity type info (class/TypeTag)
> * StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API
> and java-compatible API
> {code}
> class StreamInfo extends Serializable{
> val id:Int = UniqueId.incrementAndGetId()
> var name: String = null
> var streamId:String=null
> var parallelismNum: Int = 1
> var inKeyed:Boolean = false
> var outKeyed:Boolean = false
> var keySelector:KeySelector = null
> var typeClass:Class[_] = classOf[AnyRef]
> @transient implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
> }
> {code}
> And the StreamInfo can be shared through the runtime as implicit context for
> execution layer as well:
> {code}
> abstract class AbstractStreamBolt[T](val fieldsNum:Int=0, val ack:Boolean =
> true)(implicit streamInfo:StreamInfo) extends BaseRichBolt
> {code}
> 2. KeyValue Based Structure: currently framework user (developer) have to
> handle with field declaration again and again, and framework and business
> logic are highly coupled, according to the StreamProtocol description, user
> should not care about framework level detail like internal data structure for
> Storm using List<Object> with Fields<String> which is not friendly for
> developer, we should make sure user focus on business logic only like:
> {code}
> env.from(tuples)
> .groupByKey(_.name)
> {code}
> 3. Spout grouping instead of overriding Schema: currently especially in use
> case like HdfsAuditLog Monitoring, if developer wants to groupby certain
> key, they are forced to override Schema (specific for storm) , which is not
> good and un-reusable.
> 4. Environment Decoupled: currently the stream (metadata) /dag (logic) /
> environment (execution) are coupled with storm internal implementation, which
> is not good for becoming a metadata-driven pipeline framework in future, so
> we should decouple it.
>
> 5. Compatible with Field-based Structure in old framework and application.
> 6. Configuration: enhanced config wrapper upon typesafe-config for supporting
> get/set/default and integrated with ExecutionEnvironment
> {code}
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
> val streamName = env.config.get[String]("eagle.stream.name","eventStream")
> val streamExecutorId =
> env.config.get[String]("eagle.stream.executor",s"${streamName}Executor")
>
> env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
> {code}
> h1. Sample Application
> {code}
> case class Entity(name:String,value:Double,var inc:Int=0)
> val tuples = Seq(
> Entity("a", 1),
> Entity("a", 2),
> Entity("a", 3),
> Entity("b", 2),
> Entity("c", 3),
> Entity("d", 3)
> )
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
> // DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3
> env.from(tuples)
> .groupByKey(_.name)
> .map(o => {o.inc += 2;o})
> .filter(_.name != "b")
> .filter(_.name != "c")
> .groupByKey(o=>(o.name,o.value))
> .map(o => (o.name,o))
> .map(o => (o._1,o._2.value,o._2.inc))
> .foreach(println)
> env.execute()
> {code}
> Type is transparent for developer during both DAG compiling (programming) and
> runtime (metadata) phases
> {code}
> 2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded
> DAG
> {
> IterableStreamProducer[Entity]_1{1} ->
> GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
> GroupByKeyProducer[<function1>(Entity)]_2{1} ->
> MapProducer[Entity]_3{1} in shuffleGroup
> MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
> FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
> shuffleGroup
> FilterProducer[Entity]_5{1} ->
> GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
> GroupByKeyProducer[<function1>(Entity)]_6{1} ->
> MapProducer[Tuple2]_7{1} in shuffleGroup
> MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
> MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded
> DAG
> {
> IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in
> groupByKey(<function1>)
> MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
> FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
> shuffleGroup
> FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in
> groupByKey(<function1>)
> MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
> MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm
> topology DAG
> {
> Spout[IterableStreamProducer[Entity]_1]{1} ->
> Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
> Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in
> shuffleGroup
> Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1}
> in shuffleGroup
> Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in
> groupByKey(<function1>)
> Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in
> shuffleGroup
> Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in
> shuffleGroup
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)