David Anderson created FLINK-8914:
-------------------------------------

             Summary: CEP's greedy() modifier doesn't work
                 Key: FLINK-8914
                 URL: https://issues.apache.org/jira/browse/FLINK-8914
             Project: Flink
          Issue Type: Bug
          Components: CEP
    Affects Versions: 1.4.1, 1.4.0
            Reporter: David Anderson


When applied to the first or last component of a CEP Pattern, greedy() doesn't 
work correctly. Here's an example:
{code:java}
package com.dataartisans.flinktraining.exercises.datastream_java.cep;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;
import java.util.Map;

public class RunLength {

  public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStream<Integer> input = env.fromElements(1, 1, 1, 1, 1, 0, 1, 1, 1, 0);

    Pattern<Integer, ?> onesThenZero = Pattern.<Integer>begin("ones")
      .where(new SimpleCondition<Integer>() {
        @Override
        public boolean filter(Integer value) throws Exception {
          return value == 1;
        }
      })
      .oneOrMore()
      .greedy()
      .consecutive()
      .next("zero")
      .where(new SimpleCondition<Integer>() {
        @Override
        public boolean filter(Integer value) throws Exception {
          return value == 0;
        }
      });

      PatternStream<Integer> patternStream = CEP.pattern(input, onesThenZero);

      // Expected: 5 3
      // Actual: 5 4 3 2 1 3 2 1
      patternStream.select(new LengthOfRun()).print();

      env.execute();
    }

    public static class LengthOfRun implements PatternSelectFunction<Integer, 
Integer> {
      public Integer select(Map<String, List<Integer>> pattern) {
      return pattern.get("ones").size();
    }
  }
}
{code}
The only workaround for now seems to be to rewrite the pattern so that greedy() 
isn't needed – i.e. by bracketing the greedy section with a prefix and suffix 
that both have to be matched.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to