[ 
https://issues.apache.org/jira/browse/FLINK-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16505021#comment-16505021
 ] 

Lucas Resch edited comment on FLINK-9547 at 6/7/18 6:07 PM:
------------------------------------------------------------

[~dawidwys] I created a small example that does something similar. Somehow the 
behavior is different though. Now it doesn't call the initial pattern but the 
one on the windowed stream is called. Something is definitely wrong.

 
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Integer> objectDataStreamSource = env.fromElements(
        1, 2, 3, 4, 5, 6, 7, 8, 9, 10
);

SingleOutputStreamOperator<Long> forces = objectDataStreamSource
        .filter((FilterFunction<Integer>) Objects::nonNull)
        .process(new ProcessFunction<Integer, Long>() {
            @Override
            public void processElement(Integer value, Context ctx, 
Collector<Long> out) throws Exception {
                out.collect(value.longValue());
            }
        });

Pattern<Long, Long> forcesMock = Pattern.<Long>begin("start").where(new 
SimpleCondition<Long>() {
    @Override
    public boolean filter(Long value) {
        return true;
    }
});

CEP.pattern(forces, forcesMock)
        .select(new PatternSelectFunction<Long, String>() {
            @Override
            public String select(Map<String, List<Long>> pattern) throws 
Exception {
                return String.format("Prints %d as expected", 
pattern.get("start").get(0));
            }
        }).print();

SingleOutputStreamOperator<Long> intervals = forces
        .countWindowAll(2, 1)
        .process(new ProcessAllWindowFunction<Long, Long, GlobalWindow>() {
            @Override
            public void process(Context context, Iterable<Long> elements, 
Collector<Long> out) throws Exception {
                List<Long> items = new ArrayList<>();
                elements.forEach(items::add);
                if (items.size() == 2) {
                    out.collect(items.get(0));
                }
            }
        });

Pattern<Long, Long> intervalMock = Pattern.<Long>begin("start").where(new 
SimpleCondition<Long>() {
    @Override
    public boolean filter(Long value) throws Exception {
        return true;
    }
});

CEP.pattern(intervals, intervalMock)
        .select(new PatternSelectFunction<Long, String>() {
            @Override
            public String select(Map<String, List<Long>> pattern) throws 
Exception {
                return String.format("Doesn't print %d", 
pattern.get("start").get(0));
            }
        }).print();

env.execute();
{code}


was (Author: mlnotw):
[~dawidwys] I created a small example that does something similar. Somehow the 
behavior is different though. Now it doesn't call the initial pattern but the 
one on the windowed stream is called. Something is definitely wrong.

 
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<Integer> objectDataStreamSource = env.fromElements(
        1, 2, 3, 4, 5, 6, 7, 8, 9, 10
);

SingleOutputStreamOperator<Long> forces = objectDataStreamSource
        .filter((FilterFunction<Integer>) Objects::nonNull)
        .process(new ProcessFunction<Integer, Long>() {
            @Override
            public void processElement(Integer value, Context ctx, 
Collector<Long> out) throws Exception {
                out.collect(value.longValue());
            }
        });

Pattern<Long, Long> forcesMock = Pattern.<Long>begin("start").where(new 
SimpleCondition<Long>() {
    @Override
    public boolean filter(Long value) {
        return true;
    }
});

CEP.pattern(forces, forcesMock)
        .select(new PatternSelectFunction<Long, String>() {
            @Override
            public String select(Map<String, List<Long>> pattern) throws 
Exception {
                return String.format("Prints %d as expected", 
pattern.get("start").get(0));
            }
        }).print();

// Create another stream based on a sliding window over the input stream
SingleOutputStreamOperator<Long> intervals = forces
        .countWindowAll(2, 1)
        .process(new ProcessAllWindowFunction<Long, Long, GlobalWindow>() {
            @Override
            public void process(Context context, Iterable<Long> elements, 
Collector<Long> out) throws Exception {
                List<Long> items = new ArrayList<>();
                elements.forEach(items::add);
                if (items.size() == 2) {
                    out.collect(items.get(0));
                }
            }
        });

Pattern<Long, Long> intervalMock = Pattern.<Long>begin("start").where(new 
SimpleCondition<Long>() {
    @Override
    public boolean filter(Long value) throws Exception {
        return true;
    }
});

CEP.pattern(intervals, intervalMock)
        .select(new PatternSelectFunction<Long, String>() {
            @Override
            public String select(Map<String, List<Long>> pattern) throws 
Exception {
                return String.format("Doesn't print %d", 
pattern.get("start").get(0));
            }
        }).print();

env.execute();
{code}

> CEP pattern not called on windowed stream
> -----------------------------------------
>
>                 Key: FLINK-9547
>                 URL: https://issues.apache.org/jira/browse/FLINK-9547
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.2, 1.5.0
>            Reporter: Lucas Resch
>            Priority: Major
>
> When trying to match a pattern on a stream that was windowed the pattern will 
> not be called. The following shows example code where the issue was noticed:
> {code:java}
> // Set up stream
> SingleOutputStreamOperator<ForceZ> forces = ...
>         .filter(new FilterForcesFunction())
>         .process(new ProcessForcesFunction());
> // Define mock pattern
> Pattern<ForceZ, ?> forcesMock = Pattern.<ForceZ>begin("start").where(new 
> SimpleCondition<ForceZ>() {
>     @Override
>     public boolean filter(ForceZ value) {
>         // This is called as expected
>         return true;
>     }
> });
> // Print pattern results
> // This actually prints all incoming events as expected
> CEP.pattern(forcesMock, mock)
>         .select(new PatternSelectFunction<ForceZ, ForceZ>() {
>             @Override
>             public ForceZ select(Map<String, List<ForceZ>> pattern){
>                 return pattern.get("start").get(0);
>             }
>         }).print();
> // Create another stream based on a sliding window over the input stream
> SingleOutputStreamOperator<Interval> intervals = forces
>         .countWindowAll(2, 1)
>         .process(new ForceWindowFunction());
> // Define mock pattern
> Pattern<Interval, Interval> intervalMock = 
> Pattern.<Interval>begin("start").where(new SimpleCondition<Interval>() {
>     @Override
>     public boolean filter(Interval value) throws Exception {
>         // This is never called
>         return true;
>     }
> });
> // Print pattern results
> // Doesn't print anything since the mock condition is never called
> CEP.pattern(intervals, intervalMock)
>         .select(new PatternSelectFunction<Interval, Interval>() {
>             @Override
>             public Interval select(Map<String, List<Interval>> pattern) 
> throws Exception {
>                 return pattern.get("start").get(0);
>             }
>         }).print();
> {code}
> Either I'm doing something wrong or this is a major bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to