Yue Ma created FLINK-22888:
------------------------------
Summary: Matches results may be wrong when using notNext as the
last part of the pattern with Window
Key: FLINK-22888
URL: https://issues.apache.org/jira/browse/FLINK-22888
Project: Flink
Issue Type: Bug
Components: Library / CEP
Affects Versions: 1.9.0
Reporter: Yue Ma
the pattern is like
Pattern.begin("start").where(records == "a")
.notNext("notNext").where(records == "b")
.withIn(5milliseconds).
If there is only one event *"a"* in 5 milliseconds. I think this *“a”* should
be output as the correct result of the match next time in advanceTime.
But in the actual operation of CEP. This “a” will be treated as matching
timeout data
{code:java}
// code placeholder
@Test
public void testNoNextWithWindow() throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// (Event, timestamp)
DataStream<Event> input = env.fromElements(
Tuple2.of(new Event(1, "start", 1.0), 5L),
// last element for high final watermark
Tuple2.of(new Event(5, "final", 5.0), 100L)
).assignTimestampsAndWatermarks(new
AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
@Override
public long extractTimestamp(Tuple2<Event, Long> element, long
previousTimestamp) {
return element.f1;
}
@Override
public Watermark checkAndGetNextWatermark(Tuple2<Event, Long>
lastElement, long extractedTimestamp) {
return new Watermark(lastElement.f1 - 5);
}
}).map(new MapFunction<Tuple2<Event, Long>, Event>() {
@Override
public Event map(Tuple2<Event, Long> value) throws Exception {
return value.f0;
}
});
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new
SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
}).notNext("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("middle");
}
}).within(Time.milliseconds(5L));
DataStream<String> result = CEP.pattern(input, pattern).select(
new PatternSelectFunction<Event, String>() {
@Override
public String select(Map<String, List<Event>> pattern) {
StringBuilder builder = new StringBuilder();
builder.append(pattern.get("start").get(0).getId());
return builder.toString();
}
}
);
List<String> resultList = new ArrayList<>();
DataStreamUtils.collect(result).forEachRemaining(resultList::add);
resultList.sort(String::compareTo);
assertEquals(Arrays.asList("1"), resultList);
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)