[
https://issues.apache.org/jira/browse/FLINK-30562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17655011#comment-17655011
]
Thomas Wozniakowski commented on FLINK-30562:
---------------------------------------------
Hi [~bgeng777]
I've made some progress in narrowing down the problem. I am still working on
producing a reproducible code snippet I can share, but the problem is
definitely related to *Side Outputs*.
For context, we use Side Outputs to route events to different CEP operators
depending on a Customer ID value (different customers are interested in
different CEP sequences). We previously used the {{.split()}} operator before
it was deprecated.
We set up the side outputs with a call like this (I have dramatically
simplified the code but the problem is still occurring with the code in this
form):
{code:java}
streamWithSideOutputs = stream.process(new BrandedSideOutputFunction());
// Where the side output function ...
public static class BrandedSideOutputFunction extends
ProcessFunction<PlatformEvent, PlatformEvent> {
private final OutputTag<PlatformEvent> outputTag = new
OutputTag<>("RED_BRAND", TypeInformation.of(PlatformEvent.class));
@Override
public void processElement(PlatformEvent value, Context ctx,
Collector<PlatformEvent> out) {
ctx.output(outputTag, value);
out.collect(value);
}
}
{code}
You'll note that obviously this side output function only actually outputs to
one, hardcoded side output. The real code is more complex but as I say, the
problem still occurs with the code as written above.
With this {{.process(...)}} call upstream of the CEP operators, and the
{{parallelism}} set to a value greater than 1, the Patterns will fail to be
detected roughly 1/3rd of the time. Note that this happens even if I connect
the CEP operator to either the *main* {{DataStream}} or to a side output via
{{.getSideOutput(tag)}}.
If the {{parallelism}} is set to 1, or if I remove the side-output generating
{{.process(...)}} call and connect the CEP operator directly to the existing
{{DataStream}}, the Patterns will be detected 100% of the time.
There seems to be something up with the interaction between side outputs,
parallelism and the CEP operator in Flink 1.15.0+.
I will keep working on producing a project I can share reproducing this
problem, but hopefully this gives you something to go on?
> Patterns are not emitted with parallelism >1 since 1.15.x+
> ----------------------------------------------------------
>
> Key: FLINK-30562
> URL: https://issues.apache.org/jira/browse/FLINK-30562
> Project: Flink
> Issue Type: Bug
> Components: Library / CEP
> Affects Versions: 1.16.0, 1.15.3
> Environment: Problem observed in:
> Production:
> Dockerised Flink cluster running in AWS Fargate, sourced from AWS Kinesis and
> sink to AWS SQS
> Local:
> Completely local MiniCluster based test with no external sinks or sources
> Reporter: Thomas Wozniakowski
> Priority: Major
>
> (Apologies for the speculative and somewhat vague ticket, but I wanted to
> raise this while I am investigating to see if anyone has suggestions to help
> me narrow down the problem.)
> We are encountering an issue where our streaming Flink job has stopped
> working correctly since Flink 1.15.3. This problem is also present on Flink
> 1.16.0. The Keyed CEP operators that our job uses are no longer emitting
> Patterns reliably, but critically *this is only happening when parallelism is
> set to a value greater than 1*.
> Our local build tests were previously set up using in-JVM `MiniCluster`
> instances, or dockerised Flink clusters all set with a parallelism of 1, so
> this problem was not caught and it caused an outage when we upgraded the
> cluster version in production.
> Observing the job using the Flink console in production, I can see that
> events are *arriving* into the Keyed CEP operators, but no Pattern events are
> being emitted out of any of the operators. Furthermore, all the reported
> Watermark values are zero, though I don't know if that is a red herring as it
> seems Watermark reporting seems to have changed since 1.14.x.
> I am currently attempting to create a stripped down version of our streaming
> job to demonstrate the problem, but this is quite tricky to set up. In the
> meantime I would appreciate any hints that could point me in the right
> direction.
> I have isolated the problem to the Keyed CEP operator by removing our real
> sinks and sources from the failing test. I am still seeing the erroneous
> behaviour when setting up a job as:
> # Events are read from a list using `env.fromCollection( ... )`
> # CEP operator processes events
> # Output is captured in another list for assertions
> My best guess at the moment is something to do with Watermark emission? There
> seems to have been changes related to watermark alignment, perhaps this has
> caused some kind of regression in the CEP library? To reiterate, *this
> problem only occurs with parallelism of 2 or more. Setting the parallelism to
> 1 immediately fixes the issue*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)