[ 
https://issues.apache.org/jira/browse/FLINK-30562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655011#comment-17655011
 ] 

Thomas Wozniakowski commented on FLINK-30562:
---------------------------------------------

Hi [~bgeng777]

I've made some progress in narrowing down the problem. I am still working on 
producing a reproducible code snippet I can share, but the problem is 
definitely related to *Side Outputs*.

For context, we use Side Outputs to route events to different CEP operators 
depending on a Customer ID value (different customers are interested in 
different CEP sequences). We previously used the {{.split()}} operator before 
it was deprecated.

We set up the side outputs with a call like this (I have dramatically 
simplified the code but the problem is still occurring with the code in this 
form):

{code:java}
streamWithSideOutputs = stream.process(new BrandedSideOutputFunction());

// Where the side output function ...

    public static class BrandedSideOutputFunction extends 
ProcessFunction<PlatformEvent, PlatformEvent> {

        private final OutputTag<PlatformEvent> outputTag = new 
OutputTag<>("RED_BRAND", TypeInformation.of(PlatformEvent.class));

        @Override
        public void processElement(PlatformEvent value, Context ctx, 
Collector<PlatformEvent> out) {
            ctx.output(outputTag, value);
            out.collect(value);
        }
    }
{code}

You'll note that obviously this side output function only actually outputs to 
one, hardcoded side output. The real code is more complex but as I say, the 
problem still occurs with the code as written above.

With this {{.process(...)}} call upstream of the CEP operators, and the 
{{parallelism}} set to a value greater than 1, the Patterns will fail to be 
detected roughly 1/3rd of the time. Note that this happens even if I connect 
the CEP operator to either the *main* {{DataStream}} or to a side output via 
{{.getSideOutput(tag)}}.

If the {{parallelism}} is set to 1, or if I remove the side-output generating 
{{.process(...)}} call and connect the CEP operator directly to the existing 
{{DataStream}}, the Patterns will be detected 100% of the time.

There seems to be something up with the interaction between side outputs, 
parallelism and the CEP operator in Flink 1.15.0+.

I will keep working on producing a project I can share reproducing this 
problem, but hopefully this gives you something to go on?


> Patterns are not emitted with parallelism >1 since 1.15.x+
> ----------------------------------------------------------
>
>                 Key: FLINK-30562
>                 URL: https://issues.apache.org/jira/browse/FLINK-30562
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.16.0, 1.15.3
>         Environment: Problem observed in:
> Production:
> Dockerised Flink cluster running in AWS Fargate, sourced from AWS Kinesis and 
> sink to AWS SQS
> Local:
> Completely local MiniCluster based test with no external sinks or sources
>            Reporter: Thomas Wozniakowski
>            Priority: Major
>
> (Apologies for the speculative and somewhat vague ticket, but I wanted to 
> raise this while I am investigating to see if anyone has suggestions to help 
> me narrow down the problem.)
> We are encountering an issue where our streaming Flink job has stopped 
> working correctly since Flink 1.15.3. This problem is also present on Flink 
> 1.16.0. The Keyed CEP operators that our job uses are no longer emitting 
> Patterns reliably, but critically *this is only happening when parallelism is 
> set to a value greater than 1*. 
> Our local build tests were previously set up using in-JVM `MiniCluster` 
> instances, or dockerised Flink clusters all set with a parallelism of 1, so 
> this problem was not caught and it caused an outage when we upgraded the 
> cluster version in production.
> Observing the job using the Flink console in production, I can see that 
> events are *arriving* into the Keyed CEP operators, but no Pattern events are 
> being emitted out of any of the operators. Furthermore, all the reported 
> Watermark values are zero, though I don't know if that is a red herring as it 
> seems Watermark reporting seems to have changed since 1.14.x.
> I am currently attempting to create a stripped down version of our streaming 
> job to demonstrate the problem, but this is quite tricky to set up. In the 
> meantime I would appreciate any hints that could point me in the right 
> direction.
> I have isolated the problem to the Keyed CEP operator by removing our real 
> sinks and sources from the failing test. I am still seeing the erroneous 
> behaviour when setting up a job as:
> # Events are read from a list using `env.fromCollection( ... )`
> # CEP operator processes events
> # Output is captured in another list for assertions
> My best guess at the moment is something to do with Watermark emission? There 
> seems to have been changes related to watermark alignment, perhaps this has 
> caused some kind of regression in the CEP library? To reiterate, *this 
> problem only occurs with parallelism of 2 or more. Setting the parallelism to 
> 1 immediately fixes the issue*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to