[
https://issues.apache.org/jira/browse/FLINK-2405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14642523#comment-14642523
]
ASF GitHub Bot commented on FLINK-2405:
---------------------------------------
Github user gyfora commented on a diff in the pull request:
https://github.com/apache/flink/pull/936#discussion_r35521195
--- Diff:
flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
---
@@ -332,13 +359,23 @@ class DataStreamTest {
assert(
getFunctionForDataStream(map
.filter((x: Int) => true))
- .isInstanceOf[FilterFunction[Int]])
-
+ .isInstanceOf[FilterFunction[_]])
+
+ val statefulFilter1 = src.filterWithState((in, state: Option[Long]) =>
(true, None))
+
assert(getFunctionForDataStream(statefulFilter1).isInstanceOf[FilterFunction[_]])
+ assert(!getFunctionForDataStream(statefulFilter1).
+ asInstanceOf[StatefulFunction[_, _, _]].isPartitioned)
+
+ val statefulFilter2 = src.keyBy(x=>x).filterWithState(
+ (in, state: Option[Long]) => (false, None), true)
+ assert(getFunctionForDataStream(statefulFilter2).
+ asInstanceOf[StatefulFunction[_, _, _]].isPartitioned)
+
--- End diff --
Alright, I will do that. :+1:
> Add stateful transformations using lambdas to the Streaming Scala API
> ---------------------------------------------------------------------
>
> Key: FLINK-2405
> URL: https://issues.apache.org/jira/browse/FLINK-2405
> Project: Flink
> Issue Type: New Feature
> Components: Scala API, Streaming
> Reporter: Gyula Fora
> Assignee: Gyula Fora
> Priority: Minor
>
> I propose to extend the Streaming Scala API methods (map, flatmap, filter
> etc) with versions that take stateful functions as lambdas. This would allow
> for a nice functional way of defining stateful transformations:
> Example:
> def mapWithState( fun: (I, Option[S]) => (O, Option[S]), stateByKey: Boolean
> = false)
> The stateByKey optional flag allows the user to partition the state by key
> when applied on a KeyedDataStream.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)