Missed another change about *Configuration*:

https://github.com/haoch/incubator-eagle/blob/EAGLE-66-TYPESAFE/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/Configuration.scala

*val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)*
  val streamName = *env.config.get[String]("eagle.stream.name
<http://eagle.stream.name>","eventStream")*
  val streamExecutorId =
env.config.get[String]("eagle.stream.executor",s"${streamName}Executor")

*env.config.set("dataSourceConfig.deserializerClass",classOf[JsonMessageDeserializer].getCanonicalName)*


Regards,
Hao

On Mon, Dec 7, 2015 at 3:59 PM, Hao Chen <[email protected]> wrote:

> *TODO*
>
> ✓ Typesafe API
> ✓ groupByKey
> ☐ Test compatibility for old application
> ☐ Refactor package to seperate core/storm/spark/others
> ☐ Support KeyValue based grouping for old application and existing
> executor / spout
> ☐ Support groupByKey(fields: String*)
> ☐ Refactor Alert Executor with groupByKey and StreamInfo, avoid passing
> upStreamNames
> ☐ Siddhi support as general engine: sql(String)
> ☐ Refactor Hdfs Schema Manifest (High coupling for developers now )
> ☐ State Management: DAG flow, Performance Metrics, Visualization
> ☐ Java compatible Interface for function type parameters like mapper,
> filter and so on referring keySelector
> ☐ Refactor configuration style (maybe Qingwen could help about it)
> ☐ DAG Serialization/Deserialization (function serialization? json ->
> function? only support SQL at first phase)
>
> Regards,
> Hao
>
> On Mon, Dec 7, 2015 at 3:47 PM, Hao Chen <[email protected]> wrote:
>
>>
>> JIRA: https://issues.apache.org/jira/browse/EAGLE-66
>> Pull Request: https://github.com/apache/incubator-eagle/pull/17
>>
>> *Main Changes*
>>
>> 1. *Typesafe API:* Currently the stream processing API is not type-safe
>> (Type info are erased by stream framework), all programming interfaces for
>> developer are faced to Object/AnyRef, which is not very friendly and
>> extensible for framework user (application developer):
>>
>> public void flatMap(java.util.List<*Object*> input,
>> Collector<Tuple2<String, AlertAPIEntity>> outputCollector)
>>
>> So i propose the interface as (all type information are transparent for
>> developer, not need additional parameters, supported by Scala implicit
>> TypeTag)
>>
>> *class StreamProducer[+T <: Any] extends StreamInfo with
>> StreamProtocol[T]*
>>
>>    - *StreamInfo: * contains Stream core information including streamId,
>>    processing element id/name, entity type info (class/TypeTag)
>>    - *StreamProtocol extends JavaStreamProtocol*: contains basic Stream
>>    DSL API and java-compatible API
>>
>> class *StreamInfo*  extends Serializable{
>>   val id:Int = UniqueId.incrementAndGetId()
>>   var name: String = null
>>   var streamId:String=null
>>   var parallelismNum: Int = 1
>>   var inKeyed:Boolean = false
>>   var outKeyed:Boolean = false
>>   var keySelector:KeySelector = null
>>
>>   var typeClass:Class[_] = classOf[AnyRef]
>>   @transient  implicit var *typeTag:ru.TypeTag[*_] = ru.typeTag[AnyRef]
>> }
>>
>> 2. *KeyValue Based Structure: *currently framework user (developer) have
>> to handle with field declaration again and again, and framework and
>> business logic are highly coupled, according to the StreamProtocol
>> description, user should not care about framework level detail like
>> internal data structure for Storm using List<Object> with Fields<String>
>> which is not friendly for developer, we should make sure user focus on
>> business logic only like:
>>
>> env.from(tuples)
>> .groupByKey(_.name)
>>
>> 3. *Spout grouping instead of overriding Schema*: currently especially
>> in use case like HdfsAuditLog Monitoring, if developer wants to  groupby
>> certain key, they are forced to override Schema (specific for storm) ,
>> which is not good and un-reusable.
>>
>> 4. *Environment Decoupled: *currently the stream (metadata) /dag (logic)
>> / environment (execution) are coupled with storm internal implementation,
>> which is not good for becoming a metadata-driven pipeline framework in
>> future, so we should decouple it.
>>
>> *5. Compatible with Field-based Structure in old framework and
>> application.*
>>
>> *Sample Application*
>>
>> case class Entity(name:String,value:Double,var inc:Int=0)
>>
>> val tuples = Seq(
>> Entity("a", 1),
>> Entity("a", 2),
>> Entity("a", 3),
>> Entity("b", 2),
>> Entity("c", 3),
>> Entity("d", 3)
>> )
>>
>> val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
>>
>>
>> *// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 *
>>
>> env.from(tuples)
>> *.groupByKey(_.name)*
>> .map(o => {o.inc += 2;o})
>> .filter(_.name != "b")
>> .filter(_.name != "c")
>> *.groupByKey(o=>(o.name <http://o.name>,o.value))*
>> .map(o => (o.name,o))
>> .map(o => (o._1,o._2.value,o._2.inc))
>> .foreach(println)
>>
>>  env.execute()
>>
>>
>> *Type is transparent for developer during both DAG compiling
>> (programming) and runtime (metadata) phases*
>>
>> 2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before
>> expanded DAG
>> {
>> IterableStreamProducer[*Entity*]_1{1} ->
>> GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
>> GroupByKeyProducer[<function1>(*Entity*)]_2{1} ->
>> MapProducer[Entity]_3{1} in shuffleGroup
>> MapProducer[*Entity]*_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
>> FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in
>> shuffleGroup
>> FilterProducer[*Entity*]_5{1} ->
>> GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
>> GroupByKeyProducer[<function1>(*Entity*)]_6{1} ->
>> MapProducer[Tuple2]_7{1} in shuffleGroup
>> MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
>> MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
>> }
>> 2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After
>> expanded DAG
>> {
>> IterableStreamProducer[*Entity*]_1{1} -> MapProducer[Entity]_3{1} in
>> groupByKey(<function1>)
>> MapProducer[*Entity*]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
>> FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in
>> shuffleGroup
>> FilterProducer[*Entity*]_5{1} -> MapProducer[Tuple2]_7{1} in
>> groupByKey(<function1>)
>> MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
>> MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
>> }
>> 2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]:
>> Storm topology DAG
>> {
>>   Spout[IterableStreamProducer[*Entity*]_1]{1} ->
>> Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
>> Bolt[MapProducer[*Entity*]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1}
>> in shuffleGroup
>> Bolt[FilterProducer[*Entity*]_4 ]{1} ->
>> Bolt[FilterProducer[Entity]_5]{1} in shuffleGroup
>> Bolt[FilterProducer[*Entity*]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1}
>> in groupByKey(<function1>)
>> Bolt[MapProducer[*Tuple2*]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in
>> shuffleGroup
>> Bolt[MapProducer[*Tuple3*]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in
>> shuffleGroup
>> }
>>
>>
>> Regards,
>> Hao
>>
>
>

Reply via email to