[
https://issues.apache.org/jira/browse/EAGLE-66?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hao Chen updated EAGLE-66:
--------------------------
Description:
Firstly, as to application developer space, developer should not need to care
about exchanging internal data structure between processing elements, so that
it should allow user to process the stream just like processing a collection of
typed object, for example in following case, we should allow to:
In type-safe way: groupBy(_.user)
In field-base way: groupBy("user”)
@StreamDef("hdfsAuditLogStream")
case class AuditLog(timestamp:Long,user:String,path:String,cmd:String)
object TypedSafeApp extends App{
val ec = ExecutionContext.get[StormExecutionContext]
ec.fromKafka(ec.getConfig){ bytes =>
AuditLog(1000,"someone","/tmp/private","open")}.parallism(123)//
Stream[AuditLog]
.groupBy(_.user).parallism(12)
// Stream[AuditLog]
.map { obj => (obj.user,obj)}// Stream[(String, AuditLog)]
.groupBy(_._1)// Stream[(String, AuditLog)]
.map(_._2)// Stream[AuditLog]
.alert.persistAndEmail// Stream[AlertAPIEntity]
}
As to framework space, especially for groupBy semantics, it should look like
nothing different as to developer, but in internal space, the exchange
structure is (AnyRef, AuditLog)
As to groupBy(_.user) and object: obj in type of AuditLog, it’s always
(obj.user,obj) + fieldsGrouping by 0 for storm
As to groupBy(“user") and object: obj in type of AuditLog, it’s
(readValueByReflect(“user”,obj),obj) + fieldsGrouping by 0 for storm
was:
Firstly, as to application developer space, developer should not need to care
about exchanging internal data structure between processing elements, so that
it should allow user to process the stream just like processing a collection of
typed object, for example in following case, we should allow to:
In type-safe way: groupBy(_.user)
In field-base way: groupBy("user”)
https://github.corp.ebay.com/gist/hchen9/fbcf1af39134ba09b27d#file-typesafed-api-eagle-scala-L95-L145
@StreamDef("hdfsAuditLogStream")
case class AuditLog(timestamp:Long,user:String,path:String,cmd:String)
object TypedSafeApp extends App{
val ec = ExecutionContext.get[StormExecutionContext]
ec.fromKafka(ec.getConfig){ bytes =>
AuditLog(1000,"someone","/tmp/private","open")}.parallism(123)//
Stream[AuditLog]
.groupBy(_.user).parallism(12)
// Stream[AuditLog]
.map { obj => (obj.user,obj)}// Stream[(String, AuditLog)]
.groupBy(_._1)// Stream[(String, AuditLog)]
.map(_._2)// Stream[AuditLog]
.alert.persistAndEmail// Stream[AlertAPIEntity]
}
As to framework space, especially for groupBy semantics, it should look like
nothing different as to developer, but in internal space, the exchange
structure is (AnyRef, AuditLog)
As to groupBy(_.user) and object: obj in type of AuditLog, it’s always
(obj.user,obj) + fieldsGrouping by 0 for storm
As to groupBy(“user") and object: obj in type of AuditLog, it’s
(readValueByReflect(“user”,obj),obj) + fieldsGrouping by 0 for storm
> Eagle TypeSafe Stream Processing DSL
> ------------------------------------
>
> Key: EAGLE-66
> URL: https://issues.apache.org/jira/browse/EAGLE-66
> Project: Eagle
> Issue Type: Improvement
> Affects Versions: 0.3.0
> Reporter: Hao Chen
> Assignee: Hao Chen
> Fix For: 0.3.0
>
>
> Firstly, as to application developer space, developer should not need to care
> about exchanging internal data structure between processing elements, so that
> it should allow user to process the stream just like processing a collection
> of typed object, for example in following case, we should allow to:
> In type-safe way: groupBy(_.user)
> In field-base way: groupBy("user”)
> @StreamDef("hdfsAuditLogStream")
> case class AuditLog(timestamp:Long,user:String,path:String,cmd:String)
> object TypedSafeApp extends App{
> val ec = ExecutionContext.get[StormExecutionContext]
> ec.fromKafka(ec.getConfig){ bytes =>
> AuditLog(1000,"someone","/tmp/private","open")}.parallism(123)//
> Stream[AuditLog]
> .groupBy(_.user).parallism(12)
> // Stream[AuditLog]
> .map { obj => (obj.user,obj)}// Stream[(String, AuditLog)]
> .groupBy(_._1)// Stream[(String, AuditLog)]
> .map(_._2)// Stream[AuditLog]
> .alert.persistAndEmail// Stream[AlertAPIEntity]
> }
> As to framework space, especially for groupBy semantics, it should look like
> nothing different as to developer, but in internal space, the exchange
> structure is (AnyRef, AuditLog)
> As to groupBy(_.user) and object: obj in type of AuditLog, it’s always
> (obj.user,obj) + fieldsGrouping by 0 for storm
> As to groupBy(“user") and object: obj in type of AuditLog, it’s
> (readValueByReflect(“user”,obj),obj) + fieldsGrouping by 0 for storm
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)