[ 
https://issues.apache.org/jira/browse/FLINK-39752?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dennis-Mircea Ciupitu updated FLINK-39752:
------------------------------------------
    Description: 
h1. Summary

The operator creates more than one {{EventRecorder}} instance even though the 
recorder is a fixed, operator-scoped dependency that should be created once and 
shared. This leaves an inconsistency where, for some controllers, the recorder 
used by the reconciler and observer is a different instance from the one the 
{{FlinkService}} emits through.

h1. Background

{{FlinkResourceContextFactory}} holds a single {{EventRecorder}} and uses it 
when creating the {{FlinkService}}. The FlinkDeployment controller reuses that 
same shared instance, but the FlinkSessionJob and FlinkStateSnapshot 
controllers each create their own recorder via {{EventRecorder.create(...)}} in 
{{FlinkOperator}}. As a result, for those two controllers the recorder feeding 
the reconciler and observer is not the same instance the {{FlinkService}} uses.

h1. Why this matters

{{EventRecorder}} is currently a stateless dispatcher over the shared resource 
listeners and the Kubernetes client, so today the duplicate instances behave 
identically and the inconsistency is invisible. It becomes a real bug the 
moment any per-instance state is added to {{EventRecorder}} (for example an 
event de-duplication cache or rate limiter), because events emitted on the 
{{FlinkService}} path and events emitted on the controller path would then 
diverge.

h1. Goal

Ensure a single operator-scoped {{EventRecorder}} is created once and reused by 
every controller and by the {{FlinkResourceContextFactory}}, removing the 
duplicate per-controller instances. This keeps the recorder a shared fixed 
dependency owned by the factory, without passing it on every 
{{getResourceContext}} call.

h1. Notes

This is a behavior-preserving consistency fix. No public API, CRD, or 
reconciliation behavior changes. It is covered by the existing controller tests.

  was:
h1. Background

{{FlinkResourceContextFactory}} currently holds an {{EventRecorder}} as a 
constructor-injected field. That recorder is created once at {{FlinkOperator}} 
construction time and shared across every controller, regardless of which 
controller is calling {{getResourceContext(...)}}.

At the same time, {{FlinkSessionJobController}} and 
{{FlinkStateSnapshotController}} each create their own local {{EventRecorder}} 
via {{EventRecorder.create(client, listeners)}} during {{register*Controller}}, 
and {{FlinkBlueGreenDeploymentController}} has none at all. The result is an 
inconsistency:

* Reconcile code in a controller emits events through that controller's local 
recorder.
* The {{FlinkService}} built inside the same reconcile (via 
{{ctxFactory.getFlinkService(ctx)}}) emits events through the operator-scoped 
recorder held on the factory.

In other words, a single reconcile uses two different {{EventRecorder}} 
instances for events that conceptually belong to the same controller.

h1. Why this matters

Today {{EventRecorder}} is effectively stateless. Every public method takes the 
{{KubernetesClient}} as a parameter and forwards to {{EventUtils}}, and the 
only instance state is two {{BiConsumer}} closures over {{client}} and 
{{listeners}}. Two recorders constructed with the same {{client}} and the same 
{{listeners}} behave identically, so the asymmetry described above does not 
cause observable bugs in production right now.

It is, however, a latent bug. The moment any per-instance state is added to 
{{EventRecorder}} (per-controller dedup cache, per-controller listener subset, 
per-controller metrics, rate limiting), events emitted by {{FlinkService}} will 
silently route through the wrong recorder. That failure mode is hard to detect 
in review because the divergence spans two files and the type signatures look 
correct.

The current naming pattern (a local {{var eventRecorder = 
EventRecorder.create(...)}} in each {{register*Controller}}) also misleads 
readers into thinking each controller owns its recorder end-to-end, when in 
fact the factory keeps using the operator-scoped one.

h1. Proposal

Thread {{EventRecorder}} through the context lifecycle instead of holding it on 
the factory.

* {{FlinkResourceContextFactory}} no longer takes an {{EventRecorder}} as a 
constructor argument.
* {{FlinkResourceContext}} (and its concrete subclasses) gains an 
{{EventRecorder}} field.
* {{getResourceContext(resource, josdkContext, eventRecorder)}} accepts the 
recorder per call and stores it on the context.
* {{getFlinkService(ctx)}} reads the recorder via {{ctx.getEventRecorder()}} 
instead of from a factory field.
* Each {{register*Controller}} method in {{FlinkOperator}} owns its own 
{{EventRecorder}} (including {{registerBlueGreenController}}, which gains one 
for the first time) and passes it into the controller. Every 
{{ctxFactory.getResourceContext(...)}} call site in production code passes the 
calling controller's recorder.

This mirrors how {{josdkContext}} is already handled, since both are 
per-reconcile inputs rather than factory-lifetime dependencies.

h1. Benefits

* Removes the latent bug where {{FlinkService}}-emitted events would route 
through the wrong recorder once per-instance state is added.
* Aligns the runtime behavior with the mental model implied by the {{var 
eventRecorder = EventRecorder.create(...)}} pattern already present in 
{{registerSessionJobController}} and {{registerSnapshotController}}.
* Keeps shared singletons shared. The factory continues to own 
{{clientExecutorService}}, {{artifactManager}}, {{resourceMetricGroups}}, and 
{{lastRecordedExceptionCache}}, so no thread pools or caches get duplicated.
* Makes asymmetric paths (blue-green, snapshot operating on a 
{{FlinkDeployment}} it does not own) explicit rather than implicit, since each 
call site has to pick which recorder to pass.

h1. Out of scope

* No behavioral change is intended in this ticket. Existing tests must continue 
to pass without modification beyond the mechanical signature updates.
* Other follow-ups that build on this (per-controller dedup, richer listener 
wiring, event metrics) are deliberately not part of this change.



> Thread EventRecorder through FlinkResourceContext instead of holding it on 
> the factory
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-39752
>                 URL: https://issues.apache.org/jira/browse/FLINK-39752
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.15.0
>            Reporter: Dennis-Mircea Ciupitu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: kubernetes-operator-1.16.0
>
>
> h1. Summary
> The operator creates more than one {{EventRecorder}} instance even though the 
> recorder is a fixed, operator-scoped dependency that should be created once 
> and shared. This leaves an inconsistency where, for some controllers, the 
> recorder used by the reconciler and observer is a different instance from the 
> one the {{FlinkService}} emits through.
> h1. Background
> {{FlinkResourceContextFactory}} holds a single {{EventRecorder}} and uses it 
> when creating the {{FlinkService}}. The FlinkDeployment controller reuses 
> that same shared instance, but the FlinkSessionJob and FlinkStateSnapshot 
> controllers each create their own recorder via {{EventRecorder.create(...)}} 
> in {{FlinkOperator}}. As a result, for those two controllers the recorder 
> feeding the reconciler and observer is not the same instance the 
> {{FlinkService}} uses.
> h1. Why this matters
> {{EventRecorder}} is currently a stateless dispatcher over the shared 
> resource listeners and the Kubernetes client, so today the duplicate 
> instances behave identically and the inconsistency is invisible. It becomes a 
> real bug the moment any per-instance state is added to {{EventRecorder}} (for 
> example an event de-duplication cache or rate limiter), because events 
> emitted on the {{FlinkService}} path and events emitted on the controller 
> path would then diverge.
> h1. Goal
> Ensure a single operator-scoped {{EventRecorder}} is created once and reused 
> by every controller and by the {{FlinkResourceContextFactory}}, removing the 
> duplicate per-controller instances. This keeps the recorder a shared fixed 
> dependency owned by the factory, without passing it on every 
> {{getResourceContext}} call.
> h1. Notes
> This is a behavior-preserving consistency fix. No public API, CRD, or 
> reconciliation behavior changes. It is covered by the existing controller 
> tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to