[ 
https://issues.apache.org/jira/browse/FLINK-22442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17331236#comment-17331236
 ] 

Cedric Chen edited comment on FLINK-22442 at 4/24/21, 2:37 PM:
---------------------------------------------------------------

I want to fix this bug ,Please assign this fix to me, thanks. [~aljoscha]
and 
Test case is :


{code:java}
public class TestCepJava {

    public static void main(String[] args) throws NoSuchFieldException, 
IllegalAccessException {
        testJPatternStreamChangeTimeCharacteristic(getJPatternStream());
    }


    public static PatternStream<Event> getJPatternStream(){
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Event> input =
                env.fromElements(
                        new Event(1, "barfoo", 1.0),
                        new Event(8, "end", 1.0));

        Pattern<Event, ?> pattern =
                Pattern.<Event>begin("start")
                        .where(
                                new SimpleCondition<Event>() {

                                    @Override
                                    public boolean filter(Event value) throws 
Exception {
                                        return value.getName().equals("start");
                                    }
                                });
        PatternStream<Event> eventPatternStream = CEP.pattern(input, pattern);
        return eventPatternStream;
    }

    public static <T> void 
testJPatternStreamChangeTimeCharacteristic(PatternStream<T> estream) throws 
NoSuchFieldException, IllegalAccessException {
        getTimeBehaviour(estream);
        PatternStream<T> pStream = estream.inProcessingTime();
        getTimeBehaviour(pStream);
    }

    public static <T> void getTimeBehaviour(PatternStream pstream) throws 
NoSuchFieldException, IllegalAccessException {
        Field builder = pstream.getClass().getDeclaredField("builder");
        builder.setAccessible(true);
        Object o = builder.get(pstream);
        Field timeBehaviour = o.getClass().getDeclaredField("timeBehaviour");
        timeBehaviour.setAccessible(true);
        System.out.println(timeBehaviour.get(o));
    }
}
{code}



{code:scala}
// Some comments here
//    org.apache.flink.cep.scala.PatternStream
//    org.apache.flink.cep.PatternStream
object TestCepScala {
  def main(args: Array[String]): Unit = {
    updateCepTimeCharacteristicByScalaApi()

    /*
    * the result  is
    * EventTime
    * EventTime
    *
    * the real JPatternStream's  TimeCharacteristic  is not be changed
    * */
  }

  def updateCepTimeCharacteristicByScalaApi(): Unit = {

    //  get  org.apache.flink.cep.PatternStream
    val jestream: org.apache.flink.cep.PatternStream[Event] = 
TestCepJava.getJPatternStream

    //get org.apache.flink.cep.scala.PatternStream
    val sePstream = new PatternStream[Event](jestream)

    //get and print TimeBehaviour
    getTimeBehaviourFromScalaPatternStream(sePstream)
    //change TimeCharacteristic use scala api
    val sPstream: PatternStream[Event] = sePstream.inProcessingTime()

    //get and print TimeBehaviour
    getTimeBehaviourFromScalaPatternStream(sPstream)


  }

  def getTimeBehaviourFromScalaPatternStream(scPstream: 
org.apache.flink.cep.scala.PatternStream[Event]): Unit = {
    val field: Field = scPstream.getClass.getDeclaredField("jPatternStream")
    field.setAccessible(true)
    val JPattern: AnyRef = field.get(scPstream)
    val stream: cep.PatternStream[Event] = 
JPattern.asInstanceOf[cep.PatternStream[Event]]
    TestCepJava.getTimeBehaviour(stream)
  }
{code}




was (Author: wydhcws):
I want to fix this bug 
and 
Test case is :


{code:java}
public class TestCepJava {

    public static void main(String[] args) throws NoSuchFieldException, 
IllegalAccessException {
        testJPatternStreamChangeTimeCharacteristic(getJPatternStream());
    }


    public static PatternStream<Event> getJPatternStream(){
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Event> input =
                env.fromElements(
                        new Event(1, "barfoo", 1.0),
                        new Event(8, "end", 1.0));

        Pattern<Event, ?> pattern =
                Pattern.<Event>begin("start")
                        .where(
                                new SimpleCondition<Event>() {

                                    @Override
                                    public boolean filter(Event value) throws 
Exception {
                                        return value.getName().equals("start");
                                    }
                                });
        PatternStream<Event> eventPatternStream = CEP.pattern(input, pattern);
        return eventPatternStream;
    }

    public static <T> void 
testJPatternStreamChangeTimeCharacteristic(PatternStream<T> estream) throws 
NoSuchFieldException, IllegalAccessException {
        getTimeBehaviour(estream);
        PatternStream<T> pStream = estream.inProcessingTime();
        getTimeBehaviour(pStream);
    }

    public static <T> void getTimeBehaviour(PatternStream pstream) throws 
NoSuchFieldException, IllegalAccessException {
        Field builder = pstream.getClass().getDeclaredField("builder");
        builder.setAccessible(true);
        Object o = builder.get(pstream);
        Field timeBehaviour = o.getClass().getDeclaredField("timeBehaviour");
        timeBehaviour.setAccessible(true);
        System.out.println(timeBehaviour.get(o));
    }
}
{code}



{code:scala}
// Some comments here
//    org.apache.flink.cep.scala.PatternStream
//    org.apache.flink.cep.PatternStream
object TestCepScala {
  def main(args: Array[String]): Unit = {
    updateCepTimeCharacteristicByScalaApi()

    /*
    * the result  is
    * EventTime
    * EventTime
    *
    * the real JPatternStream's  TimeCharacteristic  is not be changed
    * */
  }

  def updateCepTimeCharacteristicByScalaApi(): Unit = {

    //  get  org.apache.flink.cep.PatternStream
    val jestream: org.apache.flink.cep.PatternStream[Event] = 
TestCepJava.getJPatternStream

    //get org.apache.flink.cep.scala.PatternStream
    val sePstream = new PatternStream[Event](jestream)

    //get and print TimeBehaviour
    getTimeBehaviourFromScalaPatternStream(sePstream)
    //change TimeCharacteristic use scala api
    val sPstream: PatternStream[Event] = sePstream.inProcessingTime()

    //get and print TimeBehaviour
    getTimeBehaviourFromScalaPatternStream(sPstream)


  }

  def getTimeBehaviourFromScalaPatternStream(scPstream: 
org.apache.flink.cep.scala.PatternStream[Event]): Unit = {
    val field: Field = scPstream.getClass.getDeclaredField("jPatternStream")
    field.setAccessible(true)
    val JPattern: AnyRef = field.get(scPstream)
    val stream: cep.PatternStream[Event] = 
JPattern.asInstanceOf[cep.PatternStream[Event]]
    TestCepJava.getTimeBehaviour(stream)
  }
{code}



> Using scala api to change the TimeCharacteristic of the PatternStream is 
> invalid
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-22442
>                 URL: https://issues.apache.org/jira/browse/FLINK-22442
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.12.0, 1.12.1, 1.12.2
>            Reporter: Cedric Chen
>            Priority: Major
>              Labels: pull-request-available
>
> Using scala api to change the TimeCharacteristic of the PatternStream is 
> invalid
> you can only use the eventTime for PatternStream
> the bug is :
> in the code in   org.apache.flink.cep.scala.PatternStream
> when we called function like inProcessingTime()
> the real JPatternStream in the object not be updated
> {code:java}
> // org.apache.flink.cep.scala.PatternStream
> class PatternStream[T](var jPatternStream: JPatternStream[T]) {
>   private[flink] def wrappedPatternStream = jPatternStream
>  ...... 
>  def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T] = {
>    jPatternStream.sideOutputLateData(lateDataOutputTag)
>    this
>  }
>   def inProcessingTime(): PatternStream[T] = {
>     jPatternStream.inProcessingTime()
>     this
>   }
>   def inEventTime(): PatternStream[T] = {
>     jPatternStream.inEventTime()
>     this
>   }
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to