[ 
https://issues.apache.org/jira/browse/FLINK-22888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu reassigned FLINK-22888:
-------------------------------

    Assignee: Nicholas Jiang

> Matches results may be wrong when using notNext as the last part of the 
> pattern with Window
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-22888
>                 URL: https://issues.apache.org/jira/browse/FLINK-22888
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.9.0
>            Reporter: Yue Ma
>            Assignee: Nicholas Jiang
>            Priority: Minor
>
> the pattern is like 
> Pattern.begin("start").where(records == "a")
>             .notNext("notNext").where(records == "b")
>             .withIn(5milliseconds).
>  
> If there is only one event *"a"* in 5 milliseconds. I think this *“a”* should 
> be output as the correct result of the match next time in advanceTime.
> But in the actual operation of CEP. This “a” will be treated as matching 
> timeout data
> {code:java}
> // code placeholder
> @Test
> public void testNoNextWithWindow() throws Exception {
>    StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>    // (Event, timestamp)
>    DataStream<Event> input = env.fromElements(
>       Tuple2.of(new Event(1, "start", 1.0), 5L),
>       // last element for high final watermark
>       Tuple2.of(new Event(5, "final", 5.0), 100L)
>    ).assignTimestampsAndWatermarks(new 
> AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
>       @Override
>       public long extractTimestamp(Tuple2<Event, Long> element, long 
> previousTimestamp) {
>          return element.f1;
>       }
>       @Override
>       public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> 
> lastElement, long extractedTimestamp) {
>          return new Watermark(lastElement.f1 - 5);
>       }
>    }).map(new MapFunction<Tuple2<Event, Long>, Event>() {
>       @Override
>       public Event map(Tuple2<Event, Long> value) throws Exception {
>          return value.f0;
>       }
>    });
>    Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new 
> SimpleCondition<Event>() {
>       @Override
>       public boolean filter(Event value) throws Exception {
>          return value.getName().equals("start");
>       }
>    }).notNext("middle").where(new SimpleCondition<Event>() {
>       @Override
>       public boolean filter(Event value) throws Exception {
>          return value.getName().equals("middle");
>       }
>    }).within(Time.milliseconds(5L));
>    DataStream<String> result = CEP.pattern(input, pattern).select(
>       new PatternSelectFunction<Event, String>() {
>          @Override
>          public String select(Map<String, List<Event>> pattern) {
>             StringBuilder builder = new StringBuilder();
>             builder.append(pattern.get("start").get(0).getId());
>             return builder.toString();
>          }
>       }
>    );
>    List<String> resultList = new ArrayList<>();
>    DataStreamUtils.collect(result).forEachRemaining(resultList::add);
>    resultList.sort(String::compareTo);
>    assertEquals(Arrays.asList("1"), resultList);
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to