[
https://issues.apache.org/jira/browse/FLINK-6658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16021514#comment-16021514
]
ASF GitHub Bot commented on FLINK-6658:
---------------------------------------
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/3963#discussion_r118055738
--- Diff:
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
---
@@ -296,25 +294,94 @@ class PatternStream[T](jPatternStream:
JPatternStream[T]) {
* timeout events wrapped in a [[Either]] type.
*/
def flatSelect[L: TypeInformation, R: TypeInformation](
- patternFlatTimeoutFunction: (mutable.Map[String, JList[T]], Long,
Collector[L]) => Unit) (
- patternFlatSelectFunction: (mutable.Map[String, JList[T]],
Collector[R]) => Unit)
+ patternFlatTimeoutFunction: (Map[String, Iterable[T]], Long,
Collector[L]) => Unit) (
+ patternFlatSelectFunction: (Map[String, Iterable[T]], Collector[R])
=> Unit)
: DataStream[Either[L, R]] = {
val cleanSelectFun = cleanClosure(patternFlatSelectFunction)
val cleanTimeoutFun = cleanClosure(patternFlatTimeoutFunction)
val patternFlatSelectFun = new PatternFlatSelectFunction[T, R] {
- override def flatSelect(pattern: JMap[String, JList[T]], out:
Collector[R]): Unit = {
- cleanSelectFun(pattern.asScala, out)
+ override def flatSelect(pattern: JMap[String, JList[T]], out:
Collector[R]): Unit =
+ cleanSelectFun(mapToScala(pattern), out)
+ }
+
+ val patternFlatTimeoutFun = new PatternFlatTimeoutFunction[T, L] {
+ override def timeout(
+ pattern: JMap[String, JList[T]],
+ timeoutTimestamp: Long, out: Collector[L])
+ : Unit = {
+ cleanTimeoutFun(mapToScala(pattern), timeoutTimestamp, out)
}
}
--- End diff --
Yes, I am also referring to those methods. :)
> Use scala Collections in scala CEP API
> --------------------------------------
>
> Key: FLINK-6658
> URL: https://issues.apache.org/jira/browse/FLINK-6658
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Affects Versions: 1.3.0
> Reporter: Dawid Wysakowicz
> Assignee: Dawid Wysakowicz
>
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)