[ https://issues.apache.org/jira/browse/FLINK-30562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655011#comment-17655011 ]
Thomas Wozniakowski commented on FLINK-30562: --------------------------------------------- Hi [~bgeng777] I've made some progress in narrowing down the problem. I am still working on producing a reproducible code snippet I can share, but the problem is definitely related to *Side Outputs*. For context, we use Side Outputs to route events to different CEP operators depending on a Customer ID value (different customers are interested in different CEP sequences). We previously used the {{.split()}} operator before it was deprecated. We set up the side outputs with a call like this (I have dramatically simplified the code but the problem is still occurring with the code in this form): {code:java} streamWithSideOutputs = stream.process(new BrandedSideOutputFunction()); // Where the side output function ... public static class BrandedSideOutputFunction extends ProcessFunction<PlatformEvent, PlatformEvent> { private final OutputTag<PlatformEvent> outputTag = new OutputTag<>("RED_BRAND", TypeInformation.of(PlatformEvent.class)); @Override public void processElement(PlatformEvent value, Context ctx, Collector<PlatformEvent> out) { ctx.output(outputTag, value); out.collect(value); } } {code} You'll note that obviously this side output function only actually outputs to one, hardcoded side output. The real code is more complex but as I say, the problem still occurs with the code as written above. With this {{.process(...)}} call upstream of the CEP operators, and the {{parallelism}} set to a value greater than 1, the Patterns will fail to be detected roughly 1/3rd of the time. Note that this happens even if I connect the CEP operator to either the *main* {{DataStream}} or to a side output via {{.getSideOutput(tag)}}. If the {{parallelism}} is set to 1, or if I remove the side-output generating {{.process(...)}} call and connect the CEP operator directly to the existing {{DataStream}}, the Patterns will be detected 100% of the time. There seems to be something up with the interaction between side outputs, parallelism and the CEP operator in Flink 1.15.0+. I will keep working on producing a project I can share reproducing this problem, but hopefully this gives you something to go on? > Patterns are not emitted with parallelism >1 since 1.15.x+ > ---------------------------------------------------------- > > Key: FLINK-30562 > URL: https://issues.apache.org/jira/browse/FLINK-30562 > Project: Flink > Issue Type: Bug > Components: Library / CEP > Affects Versions: 1.16.0, 1.15.3 > Environment: Problem observed in: > Production: > Dockerised Flink cluster running in AWS Fargate, sourced from AWS Kinesis and > sink to AWS SQS > Local: > Completely local MiniCluster based test with no external sinks or sources > Reporter: Thomas Wozniakowski > Priority: Major > > (Apologies for the speculative and somewhat vague ticket, but I wanted to > raise this while I am investigating to see if anyone has suggestions to help > me narrow down the problem.) > We are encountering an issue where our streaming Flink job has stopped > working correctly since Flink 1.15.3. This problem is also present on Flink > 1.16.0. The Keyed CEP operators that our job uses are no longer emitting > Patterns reliably, but critically *this is only happening when parallelism is > set to a value greater than 1*. > Our local build tests were previously set up using in-JVM `MiniCluster` > instances, or dockerised Flink clusters all set with a parallelism of 1, so > this problem was not caught and it caused an outage when we upgraded the > cluster version in production. > Observing the job using the Flink console in production, I can see that > events are *arriving* into the Keyed CEP operators, but no Pattern events are > being emitted out of any of the operators. Furthermore, all the reported > Watermark values are zero, though I don't know if that is a red herring as it > seems Watermark reporting seems to have changed since 1.14.x. > I am currently attempting to create a stripped down version of our streaming > job to demonstrate the problem, but this is quite tricky to set up. In the > meantime I would appreciate any hints that could point me in the right > direction. > I have isolated the problem to the Keyed CEP operator by removing our real > sinks and sources from the failing test. I am still seeing the erroneous > behaviour when setting up a job as: > # Events are read from a list using `env.fromCollection( ... )` > # CEP operator processes events > # Output is captured in another list for assertions > My best guess at the moment is something to do with Watermark emission? There > seems to have been changes related to watermark alignment, perhaps this has > caused some kind of regression in the CEP library? To reiterate, *this > problem only occurs with parallelism of 2 or more. Setting the parallelism to > 1 immediately fixes the issue* -- This message was sent by Atlassian Jira (v8.20.10#820010)