[
https://issues.apache.org/jira/browse/EAGLE-66?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15055628#comment-15055628
]
ASF GitHub Bot commented on EAGLE-66:
-------------------------------------
GitHub user haoch opened a pull request:
https://github.com/apache/incubator-eagle/pull/26
EAGLE-66. Typesafe Streaming DSL and KeyValue based Grouping
https://issues.apache.org/jira/browse/EAGLE-66
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/haoch/incubator-eagle EAGLE-66
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-eagle/pull/26.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #26
----
commit 2d7003dd1200a500b329a518d63484600e1c2cdf
Author: Hao Chen <[email protected]>
Date: 2015-12-02T16:28:58Z
[EAGLE-66] Keep type information in StreamProcessor
commit e3389cdd52f7f747db0968d9e233e4184b99f5ff
Author: Hao Chen <[email protected]>
Date: 2015-12-04T09:25:39Z
[EAGLE-66] Decouple StreamProtocol and StreamProducer
commit e419ff886316bd536aa3c23aa337e51fa27a8c01
Author: Hao Chen <[email protected]>
Date: 2015-12-04T13:33:21Z
[EAGLE-66] Decouple execution environment and config
commit 8b2d7a64f3f600cf49204cd5abff762e70e8791e
Author: Hao Chen <[email protected]>
Date: 2015-12-04T17:10:32Z
[EAGLE-66] Simplify the stream processing API and configuration
commit 66abc2283092e04cb866b1280d1a286a14b3e117
Author: Hao Chen <[email protected]>
Date: 2015-12-04T17:25:39Z
[EAGLE-66] Refactor AbstractStormSpoutProvider from abstract class to
interface StormSpoutProvider
commit 5b50c3fdb457f7f70f1654c8b19dcb1dae08b1cf
Author: Hao Chen <[email protected]>
Date: 2015-12-04T18:18:14Z
[EAGLE-66] Decouple ExecutionEnvironments factory functions
commit 5ca63dfa0fd755f008970cfa46dafdf5ac89dc29
Author: Hao Chen <[email protected]>
Date: 2015-12-04T18:20:54Z
[EAGLE-66] Rename ConfigWrapper to Configurator
commit f61fde9513de0e65dab7c22cd7f1552022764f58
Author: Hao Chen <[email protected]>
Date: 2015-12-04T19:15:45Z
[EAGLE-66] Refactor StreamProtocol `withName` to `as` interface
commit fbc234473d67d75aeece3c8f2d8963b93f11b3a0
Author: Hao Chen <[email protected]>
Date: 2015-12-05T22:19:17Z
[EAGLE-66] Basically workable key-value stream processing API
commit c20bfa141d5db4fd64790db87e7980c05984e671
Author: Hao Chen <[email protected]>
Date: 2015-12-06T07:24:26Z
[EAGLE-66] Simplify DAG structure log
commit a0ca7fa15088aded4f06eec8ab44bc2cacbb699c
Author: Hao Chen <[email protected]>
Date: 2015-12-06T08:27:53Z
[EAGLE-66] Implement from(Iterable) to replace fromSeq(Seq)
commit 69e68746520ca4648b3987f46d30acc6d6ced79a
Author: Hao Chen <[email protected]>
Date: 2015-12-06T10:04:05Z
[EAGLE-66] Clean DAG graph log
commit 7a6a13636ace590c148fc5793893d88b580e1f45
Author: Hao Chen <[email protected]>
Date: 2015-12-06T17:54:33Z
[EAGLE-66] Keep entity type info through dataflow and resolve scala/java
compatibility issues
commit 9d7b8ceaa6b0b75b132e319da9b6efa5d0f164e1
Author: Hao Chen <[email protected]>
Date: 2015-12-07T07:18:24Z
[EAGLE-66] Refactored the package of datastream API
commit 26f6646f745ea4760f7934fb338d4b66a9c31510
Author: Hao Chen <[email protected]>
Date: 2015-12-07T14:52:35Z
[EAGLE-66] Support init/reinit method for StreamInfo to extensibly resolve
issues like serialization/type and so on
commit 6fb171071a8f09223fd2757b1912bc52e1d4551d
Author: Hao Chen <[email protected]>
Date: 2015-12-07T17:17:35Z
[EAGLE-66] Support type for JavaFlatMap
commit c90efc0d34b7156d239a315f115064dc4c15bda9
Author: Hao Chen <[email protected]>
Date: 2015-12-08T12:24:24Z
[EAGLE-66] Fix DAG name and log printing
commit fd11d063896d00de40d06a33030f6be8fa85b881
Author: Hao Chen <[email protected]>
Date: 2015-12-08T14:16:41Z
[EAGLE-66] Resolve JavaTypeCompatible and add StreamTypeExpansion
commit 3b52a02c73e4a4651fd719103d78ea7566d6340b
Author: Hao Chen <[email protected]>
Date: 2015-12-14T06:46:12Z
EAGLE-66. Erase type information and fix hashCode problem for id
commit 9cc8f996fc885dec6d856e965a92185c5a78286a
Author: Hao Chen <[email protected]>
Date: 2015-12-14T08:32:07Z
EAGLE-66. Merge conflicts from master and support dot digraph
Conflicts:
eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestDAGExpansion.scala
eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----
> Eagle TypeSafe Stream Processing DSL
> ------------------------------------
>
> Key: EAGLE-66
> URL: https://issues.apache.org/jira/browse/EAGLE-66
> Project: Eagle
> Issue Type: Improvement
> Affects Versions: 0.3.0
> Reporter: Hao Chen
> Assignee: Hao Chen
> Fix For: 0.3.0
>
>
> h1. Main Features
> 1. Typesafe API: Currently the stream processing API is not type-safe (Type
> info are erased by stream framework), all programming interfaces for
> developer are faced to Object/AnyRef, which is not very friendly and
> extensible for framework user (application developer):
> {code}
> public void flatMap(java.util.List<Object> input, Collector<Tuple2<String,
> AlertAPIEntity>> outputCollector)
> {code}
> So i propose the interface as (all type information are transparent for
> developer, not need additional parameters, supported by Scala implicit
> TypeTag)
> {code}class StreamProducer[+T <: Any] extends StreamInfo with
> StreamProtocol[T]{code}
> * StreamInfo: contains Stream core information including streamId,
> processing element id/name, entity type info (class/TypeTag)
> * StreamProtocol extends JavaStreamProtocol: contains basic Stream DSL API
> and java-compatible API
> {code}
> class StreamInfo extends Serializable{
> val id:Int = UniqueId.incrementAndGetId()
> var name: String = null
> var streamId:String=null
> var parallelismNum: Int = 1
> var inKeyed:Boolean = false
> var outKeyed:Boolean = false
> var keySelector:KeySelector = null
> var typeClass:Class[_] = classOf[AnyRef]
> @transient implicit var typeTag:ru.TypeTag[_] = ru.typeTag[AnyRef]
> }
> {code}
> And the StreamInfo can be shared through the runtime as implicit context for
> execution layer as well:
> {code}
> abstract class AbstractStreamBolt[T](val fieldsNum:Int=0, val ack:Boolean =
> true)(implicit streamInfo:StreamInfo) extends BaseRichBolt
> {code}
> 2. KeyValue Based Structure: currently framework user (developer) have to
> handle with field declaration again and again, and framework and business
> logic are highly coupled, according to the StreamProtocol description, user
> should not care about framework level detail like internal data structure for
> Storm using List<Object> with Fields<String> which is not friendly for
> developer, we should make sure user focus on business logic only like:
> {code}
> env.from(tuples)
> .groupByKey(_.name)
> {code}
> 3. Spout grouping instead of overriding Schema: currently especially in use
> case like HdfsAuditLog Monitoring, if developer wants to groupby certain
> key, they are forced to override Schema (specific for storm) , which is not
> good and un-reusable.
> 4. Environment Decoupled: currently the stream (metadata) /dag (logic) /
> environment (execution) are coupled with storm internal implementation, which
> is not good for becoming a metadata-driven pipeline framework in future, so
> we should decouple it.
>
> 5. Compatible with Field-based Structure in old framework and application.
> 6. Configuration: enhanced config wrapper upon typesafe-config for supporting
> get/set/default and integrated with ExecutionEnvironment
> {code}
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
> val streamName = env.config.get[String]("eagle.stream.name","eventStream")
> val streamExecutorId =
> env.config.get[String]("eagle.stream.executor",s"${streamName}Executor")
>
> env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)
> {code}
> h1. Sample Application
> {code}
> case class Entity(name:String,value:Double,var inc:Int=0)
> val tuples = Seq(
> Entity("a", 1),
> Entity("a", 2),
> Entity("a", 3),
> Entity("b", 2),
> Entity("c", 3),
> Entity("d", 3)
> )
> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
> // DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3
> env.from(tuples)
> .groupByKey(_.name)
> .map(o => {o.inc += 2;o})
> .filter(_.name != "b")
> .filter(_.name != "c")
> .groupByKey(o=>(o.name,o.value))
> .map(o => (o.name,o))
> .map(o => (o._1,o._2.value,o._2.inc))
> .foreach(println)
> env.execute()
> {code}
> Type is transparent for developer during both DAG compiling (programming) and
> runtime (metadata) phases
> {code}
> 2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before expanded
> DAG
> {
> IterableStreamProducer[Entity]_1{1} ->
> GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
> GroupByKeyProducer[<function1>(Entity)]_2{1} ->
> MapProducer[Entity]_3{1} in shuffleGroup
> MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
> FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
> shuffleGroup
> FilterProducer[Entity]_5{1} ->
> GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
> GroupByKeyProducer[<function1>(Entity)]_6{1} ->
> MapProducer[Tuple2]_7{1} in shuffleGroup
> MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
> MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded
> DAG
> {
> IterableStreamProducer[Entity]_1{1} -> MapProducer[Entity]_3{1} in
> groupByKey(<function1>)
> MapProducer[Entity]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
> FilterProducer[Entity]_4{1} -> FilterProducer[Entity]_5{1} in
> shuffleGroup
> FilterProducer[Entity]_5{1} -> MapProducer[Tuple2]_7{1} in
> groupByKey(<function1>)
> MapProducer[Tuple2]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
> MapProducer[Tuple3]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
> }
> 2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm
> topology DAG
> {
> Spout[IterableStreamProducer[Entity]_1]{1} ->
> Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
> Bolt[MapProducer[Entity]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in
> shuffleGroup
> Bolt[FilterProducer[Entity]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1}
> in shuffleGroup
> Bolt[FilterProducer[Entity]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in
> groupByKey(<function1>)
> Bolt[MapProducer[Tuple2]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in
> shuffleGroup
> Bolt[MapProducer[Tuple3]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in
> shuffleGroup
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)