[
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249609#comment-15249609
]
ASF GitHub Bot commented on FLINK-3708:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1905#discussion_r60383831
--- Diff:
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala.pattern
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+import scala.reflect.ClassTag
+
+/**
+ * Base class for a pattern definition.
+ * <p>
+ * A pattern definition is used by { @link
org.apache.flink.cep.nfa.compiler.NFACompiler} to create
+ * a { @link NFA}.
+ *
+ * <pre>{ @code
+ * Pattern<T, F> pattern = Pattern.<T>begin("start")
+ * .next("middle").subtype(F.class)
+ * .followedBy("end").where(new MyFilterFunction());
+ * }
+ * </pre>
+ *
+ * @param jPattern Underlying Java API Pattern
+ * @tparam T Base type of the elements appearing in the pattern
+ * @tparam F Subtype of T to which the current pattern operator is
constrained
+ */
+class Pattern[T: ClassTag, F <: T : ClassTag](jPattern: JPattern[T, F]) {
+
+ private[flink] def getWrappedPattern = jPattern
+
+
+ /**
+ *
+ * @return Name of the pattern operator
+ */
+ def getName: String = jPattern.getName
+
+ /**
+ *
+ * @return Window length in which the pattern match has to occur
+ */
+ def getWindowTime: Option[Time] = {
+ val time = jPattern.getWindowTime
+ if (time == null) None else Some(time)
+ }
+
+ /**
+ *
+ * @return Filter condition for an event to be matched
+ */
+ def getFilterFunction: Option[FilterFunction[F]] = {
+ val filterFun = jPattern.getFilterFunction
+ if (filterFun == null) None else Some(filterFun)
+ }
+
+ /**
+ * Applies a subtype constraint on the current pattern operator. This
means that an event has
+ * to be of the given subtype in order to be matched.
+ *
+ * @param clazz Class of the subtype
+ * @tparam S Type of the subtype
+ * @return The same pattern operator with the new subtype constraint
+ */
+ def subtype[S <: F : ClassTag](clazz: Class[S]): Pattern[T, S] = {
+ jPattern.subtype(clazz)
+ this.asInstanceOf[Pattern[T, S]]
+ }
+
+ /**
+ * Defines the maximum time interval for a matching pattern. This means
that the time gap
+ * between first and the last event must not be longer than the window
time.
+ *
+ * @param windowTime Time of the matching window
+ * @return The same pattern operator with the new window length
+ */
+ def within(windowTime: Time): Pattern[T, F] = {
+ jPattern.within(windowTime)
+ this
+ }
+
+ /**
+ * Appends a new pattern operator to the existing one. The new pattern
operator enforces strict
+ * temporal contiguity. This means that the whole pattern only matches
if an event which matches
+ * this operator directly follows the preceding matching event. Thus,
there cannot be any
+ * events in between two matching events.
+ *
+ * @param name Name of the new pattern operator
+ * @return A new pattern operator which is appended to this pattern
operator
+ */
+ def next(name: String): Pattern[T, T] = {
+ wrapPattern(jPattern.next(name))
+ }
+
+ /**
+ * Appends a new pattern operator to the existing one. The new pattern
operator enforces
+ * non-strict temporal contiguity. This means that a matching event of
this operator and the
+ * preceding matching event might be interleaved with other events
which are ignored.
+ *
+ * @param name Name of the new pattern operator
+ * @return A new pattern operator which is appended to this pattern
operator
+ */
+ def followedBy(name: String): FollowedByPattern[T, T] = {
+ FollowedByPattern(jPattern.followedBy(name))
+ }
+
+ /**
+ * Specifies a filter condition which has to be fulfilled by an event
in order to be matched.
+ *
+ * @param filter Filter condition
+ * @return The same pattern operator where the new filter condition is
set
+ */
+ def where(filter: FilterFunction[F]): Pattern[T, F] = {
+ jPattern.where(filter)
+ this
+ }
+
+ /**
+ * Specifies a filter condition which has to be fulfilled by an event
in order to be matched.
+ *
+ * @param filterFun Filter condition
+ * @return The same pattern operator where the new filter condition is
set
+ */
+ def where(filterFun: F => Boolean): Pattern[T, F] = {
+ val filter = new FilterFunction[F] {
+ val cleanFilter = cep.scala.cleanClosure(filterFun)
+
+ override def filter(value: F): Boolean = cleanFilter(value)
+ }
+ where(filter)
+ }
+
+ //TODO ask about java api change <?> -> <? extends T> and creating a new
object vs caching object. equals/hashcode?
+ /**
+ *
+ * @return The previous pattern operator
+ */
+ def getPrevious: Option[Pattern[T, _ <: T]] = {
+ val prev = jPattern.getPrevious
+ if (prev == null) None else Some(wrapPattern(prev))
+
+ }
+
+}
+
+object Pattern {
+
+ /**
+ * Constructs a new Pattern by wrapping a given Java API Pattern
+ *
+ * @param jPattern Underlying Java API Pattern.
+ * @tparam T Base type of the elements appearing in the pattern
+ * @tparam F Subtype of T to which the current pattern operator is
constrained
+ * @return New wrapping Pattern object
+ */
+ def apply[T: ClassTag, F <: T : ClassTag]
+ (jPattern: JPattern[T, F]) = new Pattern[T, F](jPattern)
--- End diff --
Does that fit in one line?
> Scala API for CEP
> -----------------
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
> Issue Type: Improvement
> Components: CEP
> Affects Versions: 1.1.0
> Reporter: Till Rohrmann
> Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the
> {{TypeExtractor}} cannot handle them. In order to support them, it would be
> necessary to offer a Scala API for the CEP library.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)