Dennis-Mircea opened a new pull request, #1112:
URL: https://github.com/apache/flink-kubernetes-operator/pull/1112

   <!--
   *Thank you very much for contributing to the Apache Flink Kubernetes 
Operator - we are happy that you want to help us improve the project. To help 
the community review your contribution in the best possible way, please go 
through the checklist below, which will get the contribution into a shape in 
which it can be best reviewed.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the 
pull request", where *FLINK-XXXX* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix][docs] Fix typo in event time introduction` or 
`[hotfix][javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can read more on how we use GitHub Actions for CI 
[here](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/development/guide/#cicd).
   
     - Each pull request should address only one issue, not mix up code from 
multiple issues.
     
     - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and 
this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   The operator currently constructs a brand-new `PluginManager` four times 
during startup (one per consumer: metric reporters, validators, listeners, file 
systems), and the webhook constructs it twice more (validators, mutators). 
Every call rescans `/opt/flink/plugins`, builds a fresh isolated child 
`URLClassLoader` per plugin directory, and floods the startup logs with a 
duplicated `"Plugin loader with ID not found, creating it: <subdir>"` batch per 
call. This change consolidates discovery onto a single shared `PluginManager` 
per JVM, which removes the duplicate logs, drops redundant classloader 
allocations, and eliminates a latent classloader-identity hazard if a plugin 
class were ever loaded through two different SPIs.
   
   ## Brief change log
   
     - Added `OperatorPluginUtils` (in `flink-kubernetes-operator/utils/`) 
which owns the operator-wide plugin classloader policy. It exposes 
`enrichWithPluginParentFirstPatterns(Configuration)` and 
`createPluginManager(Configuration)`, and centralizes the `io.fabric8` / 
`com.fasterxml` parent-first patterns that previously lived in `ListenerUtils` 
only.
     - Added a 2-arg `(FlinkConfigManager, PluginManager)` overload to 
`ValidatorUtils.discoverValidators`, `ListenerUtils.discoverListeners`, 
`MutatorUtils.discoverMutators`, and a `(Configuration, PluginManager)` 
overload to `OperatorMetricUtils.initOperatorMetrics`. The existing single-arg 
overloads delegate to the new ones (creating a manager via 
`OperatorPluginUtils`) and are now annotated `@VisibleForTesting`, since after 
the refactor they have no production callers and exist only to keep test 
fixtures simple.
     - `FlinkOperator` constructor now creates one shared `PluginManager` via 
`OperatorPluginUtils.createPluginManager(baseConfig)` and threads it into all 
four consumers (`initOperatorMetrics`, `discoverValidators`, 
`discoverListeners`, `FileSystem.initialize`).
     - `FlinkOperatorWebhook` constructor mirrors the same pattern for 
`discoverValidators` and `discoverMutators`.
     - `ListenerUtils` no longer carries the `EXTRA_PARENT_FIRST_PATTERNS` 
constant or the `getListenerBaseConf` helper, the parent-first policy now 
applies process-wide via `OperatorPluginUtils`. Promoting it globally is 
intentional and safe: every plugin type (metric reporters, FS factories, 
validators, mutators, listeners) benefits from sharing the operator's fabric8 
client and Jackson, avoiding duplicate informers and version skew.
     - Added a one-line INFO header before each `pluginManager.load(...)` call 
so the Flink-internal `"Plugin loader with ID found, reusing it: <subdir>"` 
batches are clearly attributed to a specific SPI: `"Loading 
FlinkResourceValidator implementations from plugin directory."` 
(`ValidatorUtils`), `"Loading FlinkResourceListener ..."` (`ListenerUtils`), 
`"Loading FlinkResourceMutator ..."` (`MutatorUtils`), and `"Initializing file 
system factories from plugin directory."` (`FlinkOperator`, before 
`FileSystem.initialize`). The metrics path already prints `"Initializing 
operator metrics using conf: ..."` and needs no extra header.
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`ListenerUtilsTest`, `ValidatorUtilsTest`, `MutatorUtilsTest`, 
`OperatorMetricUtilsTest`, `FlinkOperatorTest`, `AdmissionHandlerTest`, and 
`FlinkStateSnapshotControllerTest` (the latter exercises the preserved 
single-arg overload). All pass locally. Runtime behavior was verified from the 
test logs: `FlinkOperatorTest` now emits a single `Initializing operator 
metrics ...` line followed directly by `FileSystem` initialization, with no 
repeated plugin scanning batches.
   
   End-to-end verification on minikube against a real operator image with a 
populated `/opt/flink/plugins` confirms the same: the operator now logs exactly 
one `"Plugin loader with ID not found, creating it: <subdir>"` batch at startup 
(was 4 before this PR). The remaining `"Plugin loader with ID found, reusing 
it: <subdir>"` lines that still appear are emitted from inside Flink's 
`DefaultPluginManager.load(SomeSPI.class)` itself, once per registered loader 
per call, and are independent of how many `PluginManager` instances exist. They 
are out of scope for this PR (would require an upstream Flink log-level 
change), and operators that find them noisy can already silence them today via 
log4j:
   
   ```properties
   logger.pluginmanager.name = org.apache.flink.core.plugin.DefaultPluginManager
   logger.pluginmanager.level = WARN
   ```
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
     - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to