[
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249814#comment-15249814
]
ASF GitHub Bot commented on FLINK-3708:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1905#discussion_r60400262
--- Diff:
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import java.util.{Map => JMap}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.{PatternFlatSelectFunction,
PatternSelectFunction, PatternStream => JPatternStream}
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.util.Collector
+import scala.reflect.ClassTag
+
+/**
+ * Stream abstraction for CEP pattern detection. A pattern stream is a
stream which emits detected
+ * pattern sequences as a map of events associated with their names. The
pattern is detected using a
+ * { @link org.apache.flink.cep.nfa.NFA}. In order to process the
detected sequences, the user
+ * has to specify a { @link PatternSelectFunction} or a { @link
PatternFlatSelectFunction}.
+ *
+ * @param jPatternStream Underlying pattern stream from Java API
+ * @tparam T Type of the events
+ */
+class PatternStream[T: TypeInformation : ClassTag](jPatternStream:
JPatternStream[T]) {
+
+ private[flink] def getWrappedPatternStream = jPatternStream
+
+ /**
+ * Applies a select function to the detected pattern sequence. For each
pattern sequence the
+ * provided { @link PatternSelectFunction} is called. The pattern
select function can produce
+ * exactly one resulting element.
+ *
+ * @param patternSelectFunction The pattern select function which is
called for each detected
+ * pattern sequence.
+ * @tparam R Type of the resulting elements
+ * @return { @link DataStream} which contains the resulting elements
from the pattern select
+ * unction.
+ */
+ def select[R: TypeInformation : ClassTag](patternSelectFunction:
PatternSelectFunction[T, R]): DataStream[R] = {
+ asScalaStream(jPatternStream.select(patternSelectFunction,
implicitly[TypeInformation[R]]))
+ }
+
+ /**
+ * Applies a flat select function to the detected pattern sequence. For
each pattern sequence
+ * the provided { @link PatternFlatSelectFunction} is called. The
pattern flat select function
+ * can produce an arbitrary number of resulting elements.
+ *
+ * @param patternFlatSelectFunction The pattern flat select function
which is called for each
+ * detected pattern sequence.
+ * @tparam R Type of the resulting elements
+ * @return { @link DataStream} which contains the resulting elements
from the pattern flat select
+ * function.
+ */
+ def flatSelect[R: TypeInformation : ClassTag](patternFlatSelectFunction:
PatternFlatSelectFunction[T, R]): DataStream[R] = {
+ asScalaStream(jPatternStream.flatSelect(patternFlatSelectFunction,
implicitly[TypeInformation[R]]))
+ }
+
+ /**
+ * Applies a select function to the detected pattern sequence. For each
pattern sequence the
+ * provided { @link PatternSelectFunction} is called. The pattern
select function can produce
+ * exactly one resulting element.
+ *
+ * @param patternSelectFun The pattern select function which is called
for each detected
+ * pattern sequence.
+ * @tparam R Type of the resulting elements
+ * @return { @link DataStream} which contains the resulting elements
from the pattern select
+ * function.
+ */
+ def select[R: TypeInformation : ClassTag](patternSelectFun: JMap[String,
T] => R): DataStream[R] = {
--- End diff --
I think it would be good to not give a Java `Map` but a Scala `Map` to the
`patternSelectFun` as you've done in the out commented code.
I think it would be good to use an explicit conversion via the
`JavaConverters` and the `asScala` method.
Semantically, it would be correct to provide a immutable map to the Scala
lambda. However, this would mean that we copy the map. Therefore, for the sake
of efficiency, we could simply pass in the result of the `asScala` call which
is a mutable `Map`.
> Scala API for CEP
> -----------------
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
> Issue Type: Improvement
> Components: CEP
> Affects Versions: 1.1.0
> Reporter: Till Rohrmann
> Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the
> {{TypeExtractor}} cannot handle them. In order to support them, it would be
> necessary to offer a Scala API for the CEP library.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)