[ 
https://issues.apache.org/jira/browse/FLINK-22888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17513170#comment-17513170
 ] 

Nicholas Jiang commented on FLINK-22888:
----------------------------------------

[~mayuehappy], after the offline discussion, the above description has 
something wrong that If there is a event "a" in 5 milliseconds and there are 
events which aren't event "b", event “a” could be output as the correct result 
of the match next time in advanceTime. Because the notNext is a node in NFA, 
which requires the next event to trigger. If there is no other event passed to 
the notNext node, the previous event doesn't meet the conditions of the 
Pattern. Therefore, the issue isn't a bug.

> Matches results may be wrong when using notNext as the last part of the 
> pattern with Window
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-22888
>                 URL: https://issues.apache.org/jira/browse/FLINK-22888
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.9.0
>            Reporter: Yue Ma
>            Assignee: Nicholas Jiang
>            Priority: Minor
>
> the pattern is like 
> Pattern.begin("start").where(records == "a")
>             .notNext("notNext").where(records == "b")
>             .withIn(5milliseconds).
>  
> If there is only one event *"a"* in 5 milliseconds. I think this *“a”* should 
> be output as the correct result of the match next time in advanceTime.
> But in the actual operation of CEP. This “a” will be treated as matching 
> timeout data
> {code:java}
> // code placeholder
> @Test
> public void testNoNextWithWindow() throws Exception {
>    StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>    // (Event, timestamp)
>    DataStream<Event> input = env.fromElements(
>       Tuple2.of(new Event(1, "start", 1.0), 5L),
>       // last element for high final watermark
>       Tuple2.of(new Event(5, "final", 5.0), 100L)
>    ).assignTimestampsAndWatermarks(new 
> AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
>       @Override
>       public long extractTimestamp(Tuple2<Event, Long> element, long 
> previousTimestamp) {
>          return element.f1;
>       }
>       @Override
>       public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> 
> lastElement, long extractedTimestamp) {
>          return new Watermark(lastElement.f1 - 5);
>       }
>    }).map(new MapFunction<Tuple2<Event, Long>, Event>() {
>       @Override
>       public Event map(Tuple2<Event, Long> value) throws Exception {
>          return value.f0;
>       }
>    });
>    Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new 
> SimpleCondition<Event>() {
>       @Override
>       public boolean filter(Event value) throws Exception {
>          return value.getName().equals("start");
>       }
>    }).notNext("middle").where(new SimpleCondition<Event>() {
>       @Override
>       public boolean filter(Event value) throws Exception {
>          return value.getName().equals("middle");
>       }
>    }).within(Time.milliseconds(5L));
>    DataStream<String> result = CEP.pattern(input, pattern).select(
>       new PatternSelectFunction<Event, String>() {
>          @Override
>          public String select(Map<String, List<Event>> pattern) {
>             StringBuilder builder = new StringBuilder();
>             builder.append(pattern.get("start").get(0).getId());
>             return builder.toString();
>          }
>       }
>    );
>    List<String> resultList = new ArrayList<>();
>    DataStreamUtils.collect(result).forEachRemaining(resultList::add);
>    resultList.sort(String::compareTo);
>    assertEquals(Arrays.asList("1"), resultList);
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to