JIRA: https://issues.apache.org/jira/browse/EAGLE-66
Pull Request: https://github.com/apache/incubator-eagle/pull/17

*Main Changes*

1. *Typesafe API:* Currently the stream processing API is not type-safe
(Type info are erased by stream framework), all programming interfaces for
developer are faced to Object/AnyRef, which is not very friendly and
extensible for framework user (application developer):

public void flatMap(java.util.List<*Object*> input,
Collector<Tuple2<String, AlertAPIEntity>> outputCollector)

So i propose the interface as (all type information are transparent for
developer, not need additional parameters, supported by Scala implicit
TypeTag)

*class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[T]*

   - *StreamInfo: * contains Stream core information including streamId,
   processing element id/name, entity type info (class/TypeTag)
   - *StreamProtocol extends JavaStreamProtocol*: contains basic Stream DSL
   API and java-compatible API

class *StreamInfo*  extends Serializable{
  val id:Int = UniqueId.incrementAndGetId()
  var name: String = null
  var streamId:String=null
  var parallelismNum: Int = 1
  var inKeyed:Boolean = false
  var outKeyed:Boolean = false
  var keySelector:KeySelector = null

  var typeClass:Class[_] = classOf[AnyRef]
  @transient  implicit var *typeTag:ru.TypeTag[*_] = ru.typeTag[AnyRef]
}

2. *KeyValue Based Structure: *currently framework user (developer) have to
handle with field declaration again and again, and framework and business
logic are highly coupled, according to the StreamProtocol description, user
should not care about framework level detail like internal data structure
for Storm using List<Object> with Fields<String> which is not friendly for
developer, we should make sure user focus on business logic only like:

env.from(tuples)
.groupByKey(_.name)

3. *Spout grouping instead of overriding Schema*: currently especially in
use case like HdfsAuditLog Monitoring, if developer wants to  groupby
certain key, they are forced to override Schema (specific for storm) ,
which is not good and un-reusable.

4. *Environment Decoupled: *currently the stream (metadata) /dag (logic) /
environment (execution) are coupled with storm internal implementation,
which is not good for becoming a metadata-driven pipeline framework in
future, so we should decouple it.

*5. Compatible with Field-based Structure in old framework and application.*

*Sample Application*

case class Entity(name:String,value:Double,var inc:Int=0)

val tuples = Seq(
Entity("a", 1),
Entity("a", 2),
Entity("a", 3),
Entity("b", 2),
Entity("c", 3),
Entity("d", 3)
)

val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)


*// DAG is fully automatically aware: Entity -> Tuple2 -> Tuple3 *

env.from(tuples)
*.groupByKey(_.name)*
.map(o => {o.inc += 2;o})
.filter(_.name != "b")
.filter(_.name != "c")
*.groupByKey(o=>(o.name <http://o.name>,o.value))*
.map(o => (o.name,o))
.map(o => (o._1,o._2.value,o._2.inc))
.foreach(println)

 env.execute()


*Type is transparent for developer during both DAG compiling (programming)
and runtime (metadata) phases*

2015-12-07 15:17:19,820 INFO [main] utils.GraphPrinter$[43]: Before
expanded DAG
{
IterableStreamProducer[*Entity*]_1{1} ->
GroupByKeyProducer[<function1>(Entity)]_2{1} in shuffleGroup
GroupByKeyProducer[<function1>(*Entity*)]_2{1} -> MapProducer[Entity]_3{1}
in shuffleGroup
MapProducer[*Entity]*_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in shuffleGroup
FilterProducer[*Entity*]_5{1} ->
GroupByKeyProducer[<function1>(Entity)]_6{1} in shuffleGroup
GroupByKeyProducer[<function1>(*Entity*)]_6{1} -> MapProducer[Tuple2]_7{1}
in shuffleGroup
MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,852 INFO [main] utils.GraphPrinter$[43]: After expanded
DAG
{
IterableStreamProducer[*Entity*]_1{1} -> MapProducer[Entity]_3{1} in
groupByKey(<function1>)
MapProducer[*Entity*]_3{1} -> FilterProducer[Entity]_4{1} in shuffleGroup
FilterProducer[*Entity*]_4{1} -> FilterProducer[Entity]_5{1} in shuffleGroup
FilterProducer[*Entity*]_5{1} -> MapProducer[Tuple2]_7{1} in
groupByKey(<function1>)
MapProducer[*Tuple2*]_7{1} -> MapProducer[Tuple3]_8{1} in shuffleGroup
MapProducer[*Tuple3*]_8{1} -> ForeachProducer[void]_9{1} in shuffleGroup
}
2015-12-07 15:17:19,898 INFO [main] storm.StormTopologyCompiler$[92]: Storm
topology DAG
{
  Spout[IterableStreamProducer[*Entity*]_1]{1} ->
Bolt[MapProducer[Entity]_3]{1} in groupByKey(<function1>)
Bolt[MapProducer[*Entity*]_3 ]{1} -> Bolt[FilterProducer[Entity]_4]{1} in
shuffleGroup
Bolt[FilterProducer[*Entity*]_4 ]{1} -> Bolt[FilterProducer[Entity]_5]{1}
in shuffleGroup
Bolt[FilterProducer[*Entity*]_5 ]{1} -> Bolt[MapProducer[Tuple2]_7]{1} in
groupByKey(<function1>)
Bolt[MapProducer[*Tuple2*]_7 ]{1} -> Bolt[MapProducer[Tuple3]_8]{1} in
shuffleGroup
Bolt[MapProducer[*Tuple3*]_8 ]{1} -> Bolt[ForeachProducer[void]_9]{1} in
shuffleGroup
}


Regards,
Hao

Reply via email to