dawidwys commented on a change in pull request #15742:
URL: https://github.com/apache/flink/pull/15742#discussion_r620157830
##########
File path:
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
##########
@@ -447,17 +447,17 @@ class PatternStream[T](jPatternStream: JPatternStream[T])
{
}
def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T] = {
- jPatternStream.sideOutputLateData(lateDataOutputTag)
+ jPatternStream = jPatternStream.sideOutputLateData(lateDataOutputTag)
Review comment:
I'd rather suggest creating a new scala `PatternStream`. Having a
mutable `var` does not look good.
##########
File path:
flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala
##########
@@ -0,0 +1,67 @@
+package org.apache.flink.cep.scala
+
+import java.lang.reflect.Field
+
+import org.apache.flink.cep
+import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.cep.pattern.conditions.SimpleCondition
+import org.apache.flink.streaming.api.datastream.DataStreamSource
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class CEPScalaApiPatternStreamTest {
+ /**
+ * These tests simply check that use the Scala API to update the
TimeCharacteristic of the PatternStream .
+ */
+
+ @Test
+ def updateCepTimeCharacteristicByScalaApi(): Unit = {
+
+ val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
+ val input: DataStreamSource[Event] = env.fromElements(Event(1, "barfoo",
1.0), Event(8, "end", 1.0))
+ val pattern: Pattern[Event, Event] = Pattern.begin("start").where(new
SimpleCondition[Event]() {
+ override def filter(value: Event): Boolean = value.name == "start"
+ })
+
+ val jestream: cep.PatternStream[Event] =
org.apache.flink.cep.CEP.pattern(input, pattern)
+
+ //get org.apache.flink.cep.scala.PatternStream
+ val sePstream = new PatternStream[Event](jestream)
+
+ //get TimeBehaviour
+ val time1: AnyRef = getTimeBehaviourFromScalaPatternStream(sePstream)
+
+ assertEquals(time1.toString, "EventTime")
+
+ //change TimeCharacteristic use scala api
+ val sPstream: PatternStream[Event] = sePstream.inProcessingTime()
+
+ //get TimeBehaviour
+ val time2: AnyRef = getTimeBehaviourFromScalaPatternStream(sPstream)
+
+ assertEquals(time2.toString, "ProcessingTime")
+
+
+ }
+
+ def getTimeBehaviourFromScalaPatternStream(seStream:
org.apache.flink.cep.scala.PatternStream[Event]) = {
+ val field: Field = seStream.getClass.getDeclaredField("jPatternStream")
+ field.setAccessible(true)
+ val JPattern: AnyRef = field.get(seStream)
+ val stream: cep.PatternStream[Event] =
JPattern.asInstanceOf[cep.PatternStream[Event]]
+ getTimeBehaviourFromJavaPatternStream(stream)
+ }
+
+ def getTimeBehaviourFromJavaPatternStream(jeStream:
org.apache.flink.cep.PatternStream[Event])={
+ val builder: Field = jeStream.getClass.getDeclaredField("builder")
+ builder.setAccessible(true)
+ val o: AnyRef = builder.get(jeStream)
+ val timeBehaviour: Field = o.getClass.getDeclaredField("timeBehaviour")
+ timeBehaviour.setAccessible(true)
Review comment:
That's a terrible way to test any change. Reflective access should be
really last resort and it's also an indication of a bigger problem. It is
totally susceptible to any changes to the underlying classes. It breaks all
encapsulation efforts. I am sorry if I am harsh, but please never test any code
in that way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]