Yes, this might be nice. Till and I had similar ideas about using the pattern to make broadcast variables more useable in Scala, in fact. :D
On Fri, 24 Jul 2015 at 17:39 Gyula Fóra <gyf...@apache.org> wrote: > Hey, > > I would like to propose a way to extend the standard Streaming Scala API > methods (map, flatmap, filter etc) with versions that take stateful > functions as lambdas. I think this would eliminate the awkwardness of > implementing RichFunctions in Scala and make statefulness more explicit: > > *For example:* > def map( statefulMap: (I, Option[S]) => (O, Option[S]) ) > def flatMap( statefulFlatMap: (I, Option[S] ) => (Traversable[O], > Option[S])) > > This would be translated into RichMap and RichFlatMapFunctions that store > Option[S] as OperatorState for fault tolerance. > > *Example rolling sum by key:* > val input: DataStream[Long] = ... > val sumByKey: DataStream[Long] = > input.keyBy(...).map( (next: Long, sum: Option[Long]) => > sum match { > case Some(s) => (next + s, Some(next + s)) > case None => (next, Some(next)) > }) > > What do you think? > > Gyula >