[ 
https://issues.apache.org/jira/browse/EAGLE-66?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hao Chen updated EAGLE-66:
--------------------------
    Description: 
Firstly, as to application developer space, developer should not need to care 
about exchanging internal data structure between processing elements, so that 
it should allow user to process the stream just like processing a collection of 
typed object, for example in following case, we should allow to:
In type-safe way: groupBy(_.user)
In field-base way: groupBy("user”)

@StreamDef("hdfsAuditLogStream")
case class AuditLog(timestamp:Long,user:String,path:String,cmd:String)

object TypedSafeApp extends App{
  val ec = ExecutionContext.get[StormExecutionContext]
  ec.fromKafka(ec.getConfig){ bytes => 
AuditLog(1000,"someone","/tmp/private","open")}.parallism(123)// 
Stream[AuditLog]
    .groupBy(_.user).parallism(12) 
//  Stream[AuditLog]
    .map { obj => (obj.user,obj)}//  Stream[(String, AuditLog)]
    .groupBy(_._1)// Stream[(String, AuditLog)]
    .map(_._2)// Stream[AuditLog]
    .alert.persistAndEmail// Stream[AlertAPIEntity]
}

As to framework space, especially for groupBy semantics, it should look like 
nothing different as to developer, but in internal space, the exchange 
structure is (AnyRef, AuditLog) 
As to groupBy(_.user) and object:  obj in type of AuditLog, it’s always 
(obj.user,obj) + fieldsGrouping  by 0 for storm
As to groupBy(“user") and object:  obj in type of AuditLog, it’s 
(readValueByReflect(“user”,obj),obj) + fieldsGrouping by 0 for storm

  was:

Firstly, as to application developer space, developer should not need to care 
about exchanging internal data structure between processing elements, so that 
it should allow user to process the stream just like processing a collection of 
typed object, for example in following case, we should allow to:
In type-safe way: groupBy(_.user)
In field-base way: groupBy("user”)
https://github.corp.ebay.com/gist/hchen9/fbcf1af39134ba09b27d#file-typesafed-api-eagle-scala-L95-L145

@StreamDef("hdfsAuditLogStream")
case class AuditLog(timestamp:Long,user:String,path:String,cmd:String)

object TypedSafeApp extends App{
  val ec = ExecutionContext.get[StormExecutionContext]
  ec.fromKafka(ec.getConfig){ bytes => 
AuditLog(1000,"someone","/tmp/private","open")}.parallism(123)// 
Stream[AuditLog]
    .groupBy(_.user).parallism(12) 
//  Stream[AuditLog]
    .map { obj => (obj.user,obj)}//  Stream[(String, AuditLog)]
    .groupBy(_._1)// Stream[(String, AuditLog)]
    .map(_._2)// Stream[AuditLog]
    .alert.persistAndEmail// Stream[AlertAPIEntity]
}

As to framework space, especially for groupBy semantics, it should look like 
nothing different as to developer, but in internal space, the exchange 
structure is (AnyRef, AuditLog) 
As to groupBy(_.user) and object:  obj in type of AuditLog, it’s always 
(obj.user,obj) + fieldsGrouping  by 0 for storm
As to groupBy(“user") and object:  obj in type of AuditLog, it’s 
(readValueByReflect(“user”,obj),obj) + fieldsGrouping by 0 for storm


> Eagle TypeSafe Stream Processing DSL
> ------------------------------------
>
>                 Key: EAGLE-66
>                 URL: https://issues.apache.org/jira/browse/EAGLE-66
>             Project: Eagle
>          Issue Type: Improvement
>    Affects Versions: 0.3.0
>            Reporter: Hao Chen
>            Assignee: Hao Chen
>             Fix For: 0.3.0
>
>
> Firstly, as to application developer space, developer should not need to care 
> about exchanging internal data structure between processing elements, so that 
> it should allow user to process the stream just like processing a collection 
> of typed object, for example in following case, we should allow to:
> In type-safe way: groupBy(_.user)
> In field-base way: groupBy("user”)
> @StreamDef("hdfsAuditLogStream")
> case class AuditLog(timestamp:Long,user:String,path:String,cmd:String)
> object TypedSafeApp extends App{
>   val ec = ExecutionContext.get[StormExecutionContext]
>   ec.fromKafka(ec.getConfig){ bytes => 
> AuditLog(1000,"someone","/tmp/private","open")}.parallism(123)// 
> Stream[AuditLog]
>     .groupBy(_.user).parallism(12) 
> //  Stream[AuditLog]
>     .map { obj => (obj.user,obj)}//  Stream[(String, AuditLog)]
>     .groupBy(_._1)// Stream[(String, AuditLog)]
>     .map(_._2)// Stream[AuditLog]
>     .alert.persistAndEmail// Stream[AlertAPIEntity]
> }
> As to framework space, especially for groupBy semantics, it should look like 
> nothing different as to developer, but in internal space, the exchange 
> structure is (AnyRef, AuditLog) 
> As to groupBy(_.user) and object:  obj in type of AuditLog, it’s always 
> (obj.user,obj) + fieldsGrouping  by 0 for storm
> As to groupBy(“user") and object:  obj in type of AuditLog, it’s 
> (readValueByReflect(“user”,obj),obj) + fieldsGrouping by 0 for storm



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to