[
https://issues.apache.org/jira/browse/FLINK-3708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249852#comment-15249852
]
ASF GitHub Bot commented on FLINK-3708:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/1905#discussion_r60402984
--- Diff:
flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPITCase.scala
---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import java.util.{Map => JMap}
+
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.core.fs.FileSystem
+import org.apache.flink.streaming.api.TimeCharacteristic
+import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.{After, Before, Rule, Test}
+import org.junit.rules.TemporaryFolder
+
+
+@SuppressWarnings(Array("serial")) class CEPITCase extends
ScalaStreamingMultipleProgramsTestBase {
+ private var resultPath: String = null
+ private var expected: String = null
+ val _tempFolder = new TemporaryFolder
+
+ @Rule
+ def tempFolder: TemporaryFolder = _tempFolder
+
+ @Before
+ @throws[Exception]
+ def before {
+ resultPath = tempFolder.newFile.toURI.toString
+ expected = ""
+ }
+
+ @After
+ @throws[Exception]
+ def after {
+ TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
+ }
+
+ @Test
+ @throws[Exception]
+ def testSimplePatternCEP {
+ val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
+ val input: DataStream[Event] = env.fromElements(
+ new Event(1, "barfoo", 1.0),
+ new Event(2, "start", 2.0),
+ new Event(3, "foobar", 3.0),
+ new SubEvent(4, "foo", 4.0, 1.0),
+ new Event(5, "middle", 5.0),
+ new SubEvent(6, "middle", 6.0, 2.0),
+ new SubEvent(7, "bar", 3.0, 3.0),
+ new Event(42, "42", 42.0),
+ new Event(8, "end", 1.0))
+ val pattern: Pattern[Event, _] = Pattern.begin[Event]("start")
+ .where((value: Event) => value.name == "start")
+ .followedBy("middle")
+ .subtype(classOf[SubEvent])
+ .where((value: SubEvent) => value.name == "middle")
+ .followedBy("end")
+ .where((value: Event) => value.name == "end")
+ val result: DataStream[String] = CEP.pattern(input, pattern)
+ .select((pattern: JMap[String, Event]) => {
+ val builder: StringBuilder = new StringBuilder
+ builder.append(pattern.get("start").id)
+ .append(",")
+ .append(pattern.get("middle").id)
+ .append(",")
+ .append(pattern.get("end").id)
+ .toString
+ })
+ result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE)
+ expected = "2,6,8"
+ env.execute
+ }
+
+ @Test
+ @throws[Exception]
+ def testSimpleKeyedPatternCEP {
+ val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(2)
+ val input: DataStream[Event] = env.fromElements(
+ new Event(1, "barfoo", 1.0),
+ new Event(2, "start", 2.0),
+ new Event(3, "start", 2.1),
+ new Event(3, "foobar", 3.0),
+ new SubEvent(4, "foo", 4.0, 1.0),
+ new SubEvent(3, "middle", 3.2, 1.0),
+ new Event(42, "start", 3.1),
+ new SubEvent(42, "middle", 3.3, 1.2),
+ new Event(5, "middle", 5.0),
+ new SubEvent(2, "middle", 6.0, 2.0),
+ new SubEvent(7, "bar", 3.0, 3.0),
+ new Event(42, "42", 42.0),
+ new Event(3, "end", 2.0),
+ new Event(2, "end", 1.0),
+ new Event(42, "end", 42.0))
+ .keyBy((value: Event) => value.id)
+ val pattern: Pattern[Event, _] = Pattern.begin[Event]("start")
+ .where((value: Event) => value.name == "start")
+ .followedBy("middle")
+ .subtype(classOf[SubEvent])
+ .where((value: SubEvent) => value.name == "middle")
+ .followedBy("end")
+ .where((value: Event) => value.name == "end")
+ val result: DataStream[String] = CEP.pattern(input,
pattern).select((pattern: JMap[String, Event]) => {
+ val builder: StringBuilder = new StringBuilder
+ builder
+ .append(pattern.get("start").id)
+ .append(",")
+ .append(pattern.get("middle").id)
+ .append(",")
+ .append(pattern.get("end").id)
+ .toString
+ })
+ result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE)
+ expected = "2,2,2\n3,3,3\n42,42,42"
+ env.execute
+ }
+
+ @Test
+ @throws[Exception]
+ def testSimplePatternEventTime {
+ val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val input: DataStream[Event] = env.fromElements(
+ Tuple2.of(new Event(1, "start", 1.0), 5L),
+ Tuple2.of(new Event(2, "middle", 2.0), 1L),
+ Tuple2.of(new Event(3, "end", 3.0), 3L),
+ Tuple2.of(new Event(4, "end", 4.0), 10L),
+ Tuple2.of(new Event(5, "middle", 5.0), 7L),
+ Tuple2.of(new Event(5, "middle", 5.0), 100L))
+ .assignTimestampsAndWatermarks(new
AssignerWithPunctuatedWatermarks[Tuple2[Event, Long]] {
+ def extractTimestamp(element: Tuple2[Event, Long],
previousTimestamp: Long): Long = {
+ element.f1
+ }
+
+ def checkAndGetNextWatermark(lastElement: Tuple2[Event, Long],
extractedTimestamp: Long): Watermark = {
+ new Watermark(lastElement.f1 - 5)
+ }
+ }).map((value: Tuple2[Event, Long]) => value.f0)
+
+ val pattern: Pattern[Event, _] = Pattern.begin[Event]("start")
+ .where((value: Event) => value.name == "start")
+ .followedBy("middle")
+ .where((value: Event) => value.name == "middle")
+ .followedBy("end")
+ .where((value: Event) => value.name == "end")
+
+ val result: DataStream[String] = CEP.pattern(input, pattern)
+ .select((pattern: JMap[String, Event]) => {
+ val builder: StringBuilder = new StringBuilder
+ builder
+ .append(pattern.get("start").id)
+ .append(",")
+ .append(pattern.get("middle").id)
+ .append(",")
+ .append(pattern.get("end").id)
+ .toString
+ })
+ result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE)
+ expected = "1,5,4"
+ env.execute
+ }
+
+ @Test
+ @throws[Exception]
+ def testSimpleKeyedPatternEventTime {
+ val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(2)
+ val input: DataStream[Event] = env.fromElements(
+ Tuple2.of(new Event(1, "start", 1.0), 5L),
+ Tuple2.of(new Event(1, "middle", 2.0), 1L),
+ Tuple2.of(new Event(2, "middle", 2.0), 4L),
+ Tuple2.of(new Event(2, "start", 2.0), 3L),
+ Tuple2.of(new Event(1, "end", 3.0), 3L),
+ Tuple2.of(new Event(3, "start", 4.1), 5L),
+ Tuple2.of(new Event(1, "end", 4.0), 10L),
+ Tuple2.of(new Event(2, "end", 2.0), 8L),
+ Tuple2.of(new Event(1, "middle", 5.0), 7L),
+ Tuple2.of(new Event(3, "middle", 6.0), 9L),
+ Tuple2.of(new Event(3, "end", 7.0), 7L))
+ .assignTimestampsAndWatermarks(new
AssignerWithPunctuatedWatermarks[Tuple2[Event, Long]] {
+ def extractTimestamp(element: Tuple2[Event, Long],
currentTimestamp: Long): Long = {
+ element.f1
+ }
+
+ def checkAndGetNextWatermark(lastElement: Tuple2[Event, Long],
extractedTimestamp: Long): Watermark = {
+ new Watermark(lastElement.f1 - 5)
+ }
+ }).map((value: Tuple2[Event, Long]) => value.f0)
+ .keyBy((value: Event) => value.id)
+ val pattern: Pattern[Event, _] = Pattern.begin[Event]("start")
+ .where((value: Event) => value.name == "start")
+ .followedBy("middle")
+ .where((value: Event) => value.name == "middle")
+ .followedBy("end")
+ .where((value: Event) => value.name == "end")
+ val result: DataStream[String] = CEP.pattern(input,
pattern).select((pattern: JMap[String, Event]) => {
+ val builder: StringBuilder = new StringBuilder
+ builder
+ .append(pattern.get("start").id)
+ .append(",")
+ .append(pattern.get("middle").id)
+ .append(",")
+ .append(pattern.get("end").id)
+ .toString
+ })
+ result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE)
+ expected = "1,1,1\n2,2,2"
+ env.execute
+ }
+
+
+ @Test
+ @throws[Exception]
+ def testSimplePatternWithSingleState {
+ val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
+ val input: DataStream[Tuple2[Int, Int]] = env.fromElements(new
Tuple2[Int, Int](0, 1), new Tuple2[Int, Int](0, 2))
--- End diff --
We should test Scala tuples here and not Flink's Java tuples.
> Scala API for CEP
> -----------------
>
> Key: FLINK-3708
> URL: https://issues.apache.org/jira/browse/FLINK-3708
> Project: Flink
> Issue Type: Improvement
> Components: CEP
> Affects Versions: 1.1.0
> Reporter: Till Rohrmann
> Assignee: Stefan Richter
>
> Currently, The CEP library does not support Scala case classes, because the
> {{TypeExtractor}} cannot handle them. In order to support them, it would be
> necessary to offer a Scala API for the CEP library.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)