[
https://issues.apache.org/jira/browse/FLINK-22442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17331236#comment-17331236
]
Cedric Chen edited comment on FLINK-22442 at 4/24/21, 2:37 PM:
---------------------------------------------------------------
I want to fix this bug ,Please assign this fix to me, thanks. [~aljoscha]
and
Test case is :
{code:java}
public class TestCepJava {
public static void main(String[] args) throws NoSuchFieldException,
IllegalAccessException {
testJPatternStreamChangeTimeCharacteristic(getJPatternStream());
}
public static PatternStream<Event> getJPatternStream(){
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input =
env.fromElements(
new Event(1, "barfoo", 1.0),
new Event(8, "end", 1.0));
Pattern<Event, ?> pattern =
Pattern.<Event>begin("start")
.where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws
Exception {
return value.getName().equals("start");
}
});
PatternStream<Event> eventPatternStream = CEP.pattern(input, pattern);
return eventPatternStream;
}
public static <T> void
testJPatternStreamChangeTimeCharacteristic(PatternStream<T> estream) throws
NoSuchFieldException, IllegalAccessException {
getTimeBehaviour(estream);
PatternStream<T> pStream = estream.inProcessingTime();
getTimeBehaviour(pStream);
}
public static <T> void getTimeBehaviour(PatternStream pstream) throws
NoSuchFieldException, IllegalAccessException {
Field builder = pstream.getClass().getDeclaredField("builder");
builder.setAccessible(true);
Object o = builder.get(pstream);
Field timeBehaviour = o.getClass().getDeclaredField("timeBehaviour");
timeBehaviour.setAccessible(true);
System.out.println(timeBehaviour.get(o));
}
}
{code}
{code:scala}
// Some comments here
// org.apache.flink.cep.scala.PatternStream
// org.apache.flink.cep.PatternStream
object TestCepScala {
def main(args: Array[String]): Unit = {
updateCepTimeCharacteristicByScalaApi()
/*
* the result is
* EventTime
* EventTime
*
* the real JPatternStream's TimeCharacteristic is not be changed
* */
}
def updateCepTimeCharacteristicByScalaApi(): Unit = {
// get org.apache.flink.cep.PatternStream
val jestream: org.apache.flink.cep.PatternStream[Event] =
TestCepJava.getJPatternStream
//get org.apache.flink.cep.scala.PatternStream
val sePstream = new PatternStream[Event](jestream)
//get and print TimeBehaviour
getTimeBehaviourFromScalaPatternStream(sePstream)
//change TimeCharacteristic use scala api
val sPstream: PatternStream[Event] = sePstream.inProcessingTime()
//get and print TimeBehaviour
getTimeBehaviourFromScalaPatternStream(sPstream)
}
def getTimeBehaviourFromScalaPatternStream(scPstream:
org.apache.flink.cep.scala.PatternStream[Event]): Unit = {
val field: Field = scPstream.getClass.getDeclaredField("jPatternStream")
field.setAccessible(true)
val JPattern: AnyRef = field.get(scPstream)
val stream: cep.PatternStream[Event] =
JPattern.asInstanceOf[cep.PatternStream[Event]]
TestCepJava.getTimeBehaviour(stream)
}
{code}
was (Author: wydhcws):
I want to fix this bug
and
Test case is :
{code:java}
public class TestCepJava {
public static void main(String[] args) throws NoSuchFieldException,
IllegalAccessException {
testJPatternStreamChangeTimeCharacteristic(getJPatternStream());
}
public static PatternStream<Event> getJPatternStream(){
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> input =
env.fromElements(
new Event(1, "barfoo", 1.0),
new Event(8, "end", 1.0));
Pattern<Event, ?> pattern =
Pattern.<Event>begin("start")
.where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws
Exception {
return value.getName().equals("start");
}
});
PatternStream<Event> eventPatternStream = CEP.pattern(input, pattern);
return eventPatternStream;
}
public static <T> void
testJPatternStreamChangeTimeCharacteristic(PatternStream<T> estream) throws
NoSuchFieldException, IllegalAccessException {
getTimeBehaviour(estream);
PatternStream<T> pStream = estream.inProcessingTime();
getTimeBehaviour(pStream);
}
public static <T> void getTimeBehaviour(PatternStream pstream) throws
NoSuchFieldException, IllegalAccessException {
Field builder = pstream.getClass().getDeclaredField("builder");
builder.setAccessible(true);
Object o = builder.get(pstream);
Field timeBehaviour = o.getClass().getDeclaredField("timeBehaviour");
timeBehaviour.setAccessible(true);
System.out.println(timeBehaviour.get(o));
}
}
{code}
{code:scala}
// Some comments here
// org.apache.flink.cep.scala.PatternStream
// org.apache.flink.cep.PatternStream
object TestCepScala {
def main(args: Array[String]): Unit = {
updateCepTimeCharacteristicByScalaApi()
/*
* the result is
* EventTime
* EventTime
*
* the real JPatternStream's TimeCharacteristic is not be changed
* */
}
def updateCepTimeCharacteristicByScalaApi(): Unit = {
// get org.apache.flink.cep.PatternStream
val jestream: org.apache.flink.cep.PatternStream[Event] =
TestCepJava.getJPatternStream
//get org.apache.flink.cep.scala.PatternStream
val sePstream = new PatternStream[Event](jestream)
//get and print TimeBehaviour
getTimeBehaviourFromScalaPatternStream(sePstream)
//change TimeCharacteristic use scala api
val sPstream: PatternStream[Event] = sePstream.inProcessingTime()
//get and print TimeBehaviour
getTimeBehaviourFromScalaPatternStream(sPstream)
}
def getTimeBehaviourFromScalaPatternStream(scPstream:
org.apache.flink.cep.scala.PatternStream[Event]): Unit = {
val field: Field = scPstream.getClass.getDeclaredField("jPatternStream")
field.setAccessible(true)
val JPattern: AnyRef = field.get(scPstream)
val stream: cep.PatternStream[Event] =
JPattern.asInstanceOf[cep.PatternStream[Event]]
TestCepJava.getTimeBehaviour(stream)
}
{code}
> Using scala api to change the TimeCharacteristic of the PatternStream is
> invalid
> --------------------------------------------------------------------------------
>
> Key: FLINK-22442
> URL: https://issues.apache.org/jira/browse/FLINK-22442
> Project: Flink
> Issue Type: Bug
> Components: Library / CEP
> Affects Versions: 1.12.0, 1.12.1, 1.12.2
> Reporter: Cedric Chen
> Priority: Major
> Labels: pull-request-available
>
> Using scala api to change the TimeCharacteristic of the PatternStream is
> invalid
> you can only use the eventTime for PatternStream
> the bug is :
> in the code in org.apache.flink.cep.scala.PatternStream
> when we called function like inProcessingTime()
> the real JPatternStream in the object not be updated
> {code:java}
> // org.apache.flink.cep.scala.PatternStream
> class PatternStream[T](var jPatternStream: JPatternStream[T]) {
> private[flink] def wrappedPatternStream = jPatternStream
> ......
> def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T] = {
> jPatternStream.sideOutputLateData(lateDataOutputTag)
> this
> }
> def inProcessingTime(): PatternStream[T] = {
> jPatternStream.inProcessingTime()
> this
> }
> def inEventTime(): PatternStream[T] = {
> jPatternStream.inEventTime()
> this
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)