[
https://issues.apache.org/jira/browse/FLINK-39752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dennis-Mircea Ciupitu updated FLINK-39752:
------------------------------------------
Description:
h1. Summary
The operator creates more than one {{EventRecorder}} instance even though the
recorder is a fixed, operator-scoped dependency that should be created once and
shared. This leaves an inconsistency where, for some controllers, the recorder
used by the reconciler and observer is a different instance from the one the
{{FlinkService}} emits through.
h1. Background
{{FlinkResourceContextFactory}} holds a single {{EventRecorder}} and uses it
when creating the {{FlinkService}}. The FlinkDeployment controller reuses that
same shared instance, but the FlinkSessionJob and FlinkStateSnapshot
controllers each create their own recorder via {{EventRecorder.create(...)}} in
{{FlinkOperator}}. As a result, for those two controllers the recorder feeding
the reconciler and observer is not the same instance the {{FlinkService}} uses.
h1. Why this matters
{{EventRecorder}} is currently a stateless dispatcher over the shared resource
listeners and the Kubernetes client, so today the duplicate instances behave
identically and the inconsistency is invisible. It becomes a real bug the
moment any per-instance state is added to {{EventRecorder}} (for example an
event de-duplication cache or rate limiter), because events emitted on the
{{FlinkService}} path and events emitted on the controller path would then
diverge.
h1. Goal
Ensure a single operator-scoped {{EventRecorder}} is created once and reused by
every controller and by the {{FlinkResourceContextFactory}}, removing the
duplicate per-controller instances. This keeps the recorder a shared fixed
dependency owned by the factory, without passing it on every
{{getResourceContext}} call.
h1. Notes
This is a behavior-preserving consistency fix. No public API, CRD, or
reconciliation behavior changes. It is covered by the existing controller tests.
was:
h1. Background
{{FlinkResourceContextFactory}} currently holds an {{EventRecorder}} as a
constructor-injected field. That recorder is created once at {{FlinkOperator}}
construction time and shared across every controller, regardless of which
controller is calling {{getResourceContext(...)}}.
At the same time, {{FlinkSessionJobController}} and
{{FlinkStateSnapshotController}} each create their own local {{EventRecorder}}
via {{EventRecorder.create(client, listeners)}} during {{register*Controller}},
and {{FlinkBlueGreenDeploymentController}} has none at all. The result is an
inconsistency:
* Reconcile code in a controller emits events through that controller's local
recorder.
* The {{FlinkService}} built inside the same reconcile (via
{{ctxFactory.getFlinkService(ctx)}}) emits events through the operator-scoped
recorder held on the factory.
In other words, a single reconcile uses two different {{EventRecorder}}
instances for events that conceptually belong to the same controller.
h1. Why this matters
Today {{EventRecorder}} is effectively stateless. Every public method takes the
{{KubernetesClient}} as a parameter and forwards to {{EventUtils}}, and the
only instance state is two {{BiConsumer}} closures over {{client}} and
{{listeners}}. Two recorders constructed with the same {{client}} and the same
{{listeners}} behave identically, so the asymmetry described above does not
cause observable bugs in production right now.
It is, however, a latent bug. The moment any per-instance state is added to
{{EventRecorder}} (per-controller dedup cache, per-controller listener subset,
per-controller metrics, rate limiting), events emitted by {{FlinkService}} will
silently route through the wrong recorder. That failure mode is hard to detect
in review because the divergence spans two files and the type signatures look
correct.
The current naming pattern (a local {{var eventRecorder =
EventRecorder.create(...)}} in each {{register*Controller}}) also misleads
readers into thinking each controller owns its recorder end-to-end, when in
fact the factory keeps using the operator-scoped one.
h1. Proposal
Thread {{EventRecorder}} through the context lifecycle instead of holding it on
the factory.
* {{FlinkResourceContextFactory}} no longer takes an {{EventRecorder}} as a
constructor argument.
* {{FlinkResourceContext}} (and its concrete subclasses) gains an
{{EventRecorder}} field.
* {{getResourceContext(resource, josdkContext, eventRecorder)}} accepts the
recorder per call and stores it on the context.
* {{getFlinkService(ctx)}} reads the recorder via {{ctx.getEventRecorder()}}
instead of from a factory field.
* Each {{register*Controller}} method in {{FlinkOperator}} owns its own
{{EventRecorder}} (including {{registerBlueGreenController}}, which gains one
for the first time) and passes it into the controller. Every
{{ctxFactory.getResourceContext(...)}} call site in production code passes the
calling controller's recorder.
This mirrors how {{josdkContext}} is already handled, since both are
per-reconcile inputs rather than factory-lifetime dependencies.
h1. Benefits
* Removes the latent bug where {{FlinkService}}-emitted events would route
through the wrong recorder once per-instance state is added.
* Aligns the runtime behavior with the mental model implied by the {{var
eventRecorder = EventRecorder.create(...)}} pattern already present in
{{registerSessionJobController}} and {{registerSnapshotController}}.
* Keeps shared singletons shared. The factory continues to own
{{clientExecutorService}}, {{artifactManager}}, {{resourceMetricGroups}}, and
{{lastRecordedExceptionCache}}, so no thread pools or caches get duplicated.
* Makes asymmetric paths (blue-green, snapshot operating on a
{{FlinkDeployment}} it does not own) explicit rather than implicit, since each
call site has to pick which recorder to pass.
h1. Out of scope
* No behavioral change is intended in this ticket. Existing tests must continue
to pass without modification beyond the mechanical signature updates.
* Other follow-ups that build on this (per-controller dedup, richer listener
wiring, event metrics) are deliberately not part of this change.
> Thread EventRecorder through FlinkResourceContext instead of holding it on
> the factory
> --------------------------------------------------------------------------------------
>
> Key: FLINK-39752
> URL: https://issues.apache.org/jira/browse/FLINK-39752
> Project: Flink
> Issue Type: Improvement
> Components: Kubernetes Operator
> Affects Versions: kubernetes-operator-1.15.0
> Reporter: Dennis-Mircea Ciupitu
> Priority: Major
> Labels: pull-request-available
> Fix For: kubernetes-operator-1.16.0
>
>
> h1. Summary
> The operator creates more than one {{EventRecorder}} instance even though the
> recorder is a fixed, operator-scoped dependency that should be created once
> and shared. This leaves an inconsistency where, for some controllers, the
> recorder used by the reconciler and observer is a different instance from the
> one the {{FlinkService}} emits through.
> h1. Background
> {{FlinkResourceContextFactory}} holds a single {{EventRecorder}} and uses it
> when creating the {{FlinkService}}. The FlinkDeployment controller reuses
> that same shared instance, but the FlinkSessionJob and FlinkStateSnapshot
> controllers each create their own recorder via {{EventRecorder.create(...)}}
> in {{FlinkOperator}}. As a result, for those two controllers the recorder
> feeding the reconciler and observer is not the same instance the
> {{FlinkService}} uses.
> h1. Why this matters
> {{EventRecorder}} is currently a stateless dispatcher over the shared
> resource listeners and the Kubernetes client, so today the duplicate
> instances behave identically and the inconsistency is invisible. It becomes a
> real bug the moment any per-instance state is added to {{EventRecorder}} (for
> example an event de-duplication cache or rate limiter), because events
> emitted on the {{FlinkService}} path and events emitted on the controller
> path would then diverge.
> h1. Goal
> Ensure a single operator-scoped {{EventRecorder}} is created once and reused
> by every controller and by the {{FlinkResourceContextFactory}}, removing the
> duplicate per-controller instances. This keeps the recorder a shared fixed
> dependency owned by the factory, without passing it on every
> {{getResourceContext}} call.
> h1. Notes
> This is a behavior-preserving consistency fix. No public API, CRD, or
> reconciliation behavior changes. It is covered by the existing controller
> tests.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)