This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new e4c56b2b [FLINK-39688] Share a single PluginManager across operator 
and webhook startup (#1112)
e4c56b2b is described below

commit e4c56b2be3d844d4c12176f281972a8cc3de49b2
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Thu Jun 4 16:43:00 2026 +0300

    [FLINK-39688] Share a single PluginManager across operator and webhook 
startup (#1112)
---
 .../flink/kubernetes/operator/FlinkOperator.java   | 11 ++--
 .../operator/listener/ListenerUtils.java           | 30 ++--------
 .../operator/metrics/OperatorMetricUtils.java      | 10 +++-
 .../kubernetes/operator/utils/MutatorUtils.java    | 14 ++---
 .../operator/utils/OperatorPluginUtils.java        | 67 ++++++++++++++++++++++
 .../kubernetes/operator/utils/ValidatorUtils.java  |  8 ++-
 .../FlinkStateSnapshotControllerTest.java          |  6 +-
 .../TestingFlinkDeploymentController.java          |  6 +-
 .../TestingFlinkSessionJobController.java          |  6 +-
 .../operator/listener/ListenerUtilsTest.java       |  8 ++-
 .../operator/utils/MutatorUtilsTest.java           |  6 +-
 .../operator/utils/ValidatorUtilsTest.java         |  6 +-
 .../operator/admission/FlinkOperatorWebhook.java   |  8 ++-
 .../operator/admission/AdmissionHandlerTest.java   | 17 +++---
 14 files changed, 141 insertions(+), 62 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index cc15c5c8..d3ad2d14 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -21,7 +21,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginManager;
-import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
@@ -53,6 +52,7 @@ import 
org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
@@ -105,7 +105,8 @@ public class FlinkOperator {
                                 
KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class));
 
         baseConfig = configManager.getDefaultConfig();
-        this.metricGroup = OperatorMetricUtils.initOperatorMetrics(baseConfig);
+        PluginManager pluginManager = 
OperatorPluginUtils.createPluginManager(baseConfig);
+        this.metricGroup = OperatorMetricUtils.initOperatorMetrics(baseConfig, 
pluginManager);
         if (client == null) {
             this.client =
                     KubernetesClientUtils.getKubernetesClient(
@@ -114,12 +115,12 @@ public class FlinkOperator {
             this.client = client;
         }
         this.operator = createOperator();
-        this.validators = ValidatorUtils.discoverValidators(configManager);
-        this.listeners = ListenerUtils.discoverListeners(configManager);
+        this.validators = ValidatorUtils.discoverValidators(configManager, 
pluginManager);
+        this.listeners = ListenerUtils.discoverListeners(configManager, 
pluginManager);
         this.eventRecorder = EventRecorder.create(client, listeners);
         this.ctxFactory =
                 new FlinkResourceContextFactory(configManager, metricGroup, 
eventRecorder);
-        PluginManager pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(baseConfig);
+        LOG.info("Initializing file system factories from plugin directory.");
         FileSystem.initialize(baseConfig, pluginManager);
         this.operatorHealthService = 
OperatorHealthService.fromConfig(configManager);
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
index 8ab3997c..0ee23599 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
@@ -22,9 +22,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DelegatingConfiguration;
-import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 
@@ -33,10 +32,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
@@ -53,8 +50,6 @@ public class ListenerUtils {
     private static final String SUFFIX = ".class";
     private static final Pattern PTN =
             Pattern.compile(Pattern.quote(PREFIX) + "([\\S&&[^.]]*)" + 
Pattern.quote(SUFFIX));
-    private static final List<String> EXTRA_PARENT_FIRST_PATTERNS =
-            List.of("io.fabric8", "com.fasterxml");
 
     /**
      * Load {@link FlinkResourceListener} implementations from the plugin 
directory. Only listeners
@@ -64,15 +59,17 @@ public class ListenerUtils {
      * kubernetes.operator.plugins.listeners.test.k1: v1
      *
      * @param configManager {@link FlinkConfigManager} to access plugin 
configurations.
+     * @param pluginManager shared {@link PluginManager} used for plugin 
discovery.
      * @return Enabled listeners.
      */
     public static Collection<FlinkResourceListener> discoverListeners(
-            FlinkConfigManager configManager) {
+            FlinkConfigManager configManager, PluginManager pluginManager) {
         var listeners = new ArrayList<FlinkResourceListener>();
-        var conf = getListenerBaseConf(configManager);
+        var conf = configManager.getDefaultConfig();
 
         Map<String, Configuration> listenerConfigs = loadListenerConfigs(conf);
-        PluginUtils.createPluginManagerFromRootFolder(conf)
+        LOG.info("Loading FlinkResourceListener implementations from plugin 
directory.");
+        pluginManager
                 .load(FlinkResourceListener.class)
                 .forEachRemaining(
                         listener -> {
@@ -96,21 +93,6 @@ public class ListenerUtils {
         return listeners;
     }
 
-    private static Configuration getListenerBaseConf(FlinkConfigManager 
configManager) {
-        var conf = new Configuration(configManager.getDefaultConfig());
-        List<String> additionalPatterns =
-                new ArrayList<>(
-                        conf.getOptional(
-                                        CoreOptions
-                                                
.PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL)
-                                .orElse(Collections.emptyList()));
-        additionalPatterns.addAll(EXTRA_PARENT_FIRST_PATTERNS);
-        conf.set(
-                
CoreOptions.PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
-                additionalPatterns);
-        return conf;
-    }
-
     @VisibleForTesting
     protected static Map<String, Configuration> 
loadListenerConfigs(Configuration configuration) {
         Map<String, Configuration> listenerConfigs = new HashMap<>();
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
index 58c97e14..a28e3212 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorMetricUtils.java
@@ -20,11 +20,11 @@ package org.apache.flink.kubernetes.operator.metrics;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.plugin.PluginManager;
-import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
@@ -55,10 +55,16 @@ public class OperatorMetricUtils {
     private static final String OPERATOR_METRICS_PREFIX = K8S_OP_CONF_PREFIX + 
"metrics.";
     private static final String METRICS_PREFIX = "metrics.";
 
+    @VisibleForTesting
     public static KubernetesOperatorMetricGroup 
initOperatorMetrics(Configuration defaultConfig) {
+        return initOperatorMetrics(
+                defaultConfig, 
OperatorPluginUtils.createPluginManager(defaultConfig));
+    }
+
+    public static KubernetesOperatorMetricGroup initOperatorMetrics(
+            Configuration defaultConfig, PluginManager pluginManager) {
         Configuration metricConfig = createMetricConfig(defaultConfig);
         LOG.info("Initializing operator metrics using conf: {}", metricConfig);
-        PluginManager pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(metricConfig);
         MetricRegistry metricRegistry = createMetricRegistry(metricConfig, 
pluginManager);
         KubernetesOperatorMetricGroup operatorMetricGroup =
                 KubernetesOperatorMetricGroup.create(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/MutatorUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/MutatorUtils.java
index 9f259379..c99f844e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/MutatorUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/MutatorUtils.java
@@ -18,7 +18,7 @@
 package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.mutator.DefaultFlinkMutator;
 import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
@@ -34,19 +34,15 @@ public final class MutatorUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MutatorUtils.class);
 
-    /**
-     * discovers mutators.
-     *
-     * @param configManager Flink Config manager
-     * @return Set of FlinkResourceMutator
-     */
-    public static Set<FlinkResourceMutator> 
discoverMutators(FlinkConfigManager configManager) {
+    public static Set<FlinkResourceMutator> discoverMutators(
+            FlinkConfigManager configManager, PluginManager pluginManager) {
         var conf = configManager.getDefaultConfig();
         Set<FlinkResourceMutator> flinkmutator = new HashSet<>();
         DefaultFlinkMutator defaultFlinkMutator = new DefaultFlinkMutator();
         defaultFlinkMutator.configure(conf);
         flinkmutator.add(defaultFlinkMutator);
-        PluginUtils.createPluginManagerFromRootFolder(conf)
+        LOG.info("Loading FlinkResourceMutator implementations from plugin 
directory.");
+        pluginManager
                 .load(FlinkResourceMutator.class)
                 .forEachRemaining(
                         mutator -> {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorPluginUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorPluginUtils.java
new file mode 100644
index 00000000..b10884e0
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/OperatorPluginUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Builds the single, process-wide {@link PluginManager} used by the operator 
and webhook for plugin
+ * discovery (metric reporters, file systems, validators, mutators, listeners).
+ *
+ * <p>Plugin classloaders delegate {@code io.fabric8} and {@code 
com.fasterxml} to the parent so all
+ * plugins share the operator's fabric8 client and Jackson, avoiding duplicate 
informers and version
+ * skew.
+ */
+public final class OperatorPluginUtils {
+
+    private static final List<String> EXTRA_PARENT_FIRST_PATTERNS =
+            List.of("io.fabric8", "com.fasterxml");
+
+    private OperatorPluginUtils() {}
+
+    /** Returns a copy of {@code baseConf} with the operator's parent-first 
patterns merged in. */
+    private static Configuration 
enrichWithPluginParentFirstPatterns(Configuration baseConf) {
+        var conf = new Configuration(baseConf);
+        var patterns =
+                new ArrayList<>(
+                        conf.getOptional(
+                                        CoreOptions
+                                                
.PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL)
+                                .orElse(Collections.emptyList()));
+        for (var p : EXTRA_PARENT_FIRST_PATTERNS) {
+            if (!patterns.contains(p)) {
+                patterns.add(p);
+            }
+        }
+        
conf.set(CoreOptions.PLUGIN_ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, 
patterns);
+        return conf;
+    }
+
+    /** Creates the shared {@link PluginManager} from {@code baseConf}, 
enriching it as needed. */
+    public static PluginManager createPluginManager(Configuration baseConf) {
+        return PluginUtils.createPluginManagerFromRootFolder(
+                enrichWithPluginParentFirstPatterns(baseConf));
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
index 0f21c81a..2cf97e08 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtils.java
@@ -18,7 +18,7 @@
 package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
@@ -36,13 +36,15 @@ public final class ValidatorUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ValidatorUtils.class);
 
-    public static Set<FlinkResourceValidator> 
discoverValidators(FlinkConfigManager configManager) {
+    public static Set<FlinkResourceValidator> discoverValidators(
+            FlinkConfigManager configManager, PluginManager pluginManager) {
         var conf = configManager.getDefaultConfig();
         Set<FlinkResourceValidator> resourceValidators = new HashSet<>();
         DefaultValidator defaultValidator = new 
DefaultValidator(configManager);
         defaultValidator.configure(conf);
         resourceValidators.add(defaultValidator);
-        PluginUtils.createPluginManagerFromRootFolder(conf)
+        LOG.info("Loading FlinkResourceValidator implementations from plugin 
directory.");
+        pluginManager
                 .load(FlinkResourceValidator.class)
                 .forEachRemaining(
                         validator -> {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
index cfe69473..ce36f426 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.kubernetes.operator.reconciler.snapshot.StateSnapshotRec
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.FlinkResourceEventCollector;
 import 
org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotEventCollector;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 
@@ -122,7 +123,10 @@ public class FlinkStateSnapshotControllerTest {
         statusRecorder = new StatusRecorder<>(metricManager, 
statusUpdateCounter);
         controller =
                 new FlinkStateSnapshotController(
-                        ValidatorUtils.discoverValidators(configManager),
+                        ValidatorUtils.discoverValidators(
+                                configManager,
+                                OperatorPluginUtils.createPluginManager(
+                                        configManager.getDefaultConfig())),
                         ctxFactory,
                         new StateSnapshotReconciler(ctxFactory, eventRecorder),
                         new StateSnapshotObserver(ctxFactory, eventRecorder),
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index af04e069..7c380727 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.kubernetes.operator.resources.ClusterResourceManager;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.FlinkResourceEventCollector;
 import 
org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotEventCollector;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 
@@ -104,7 +105,10 @@ public class TestingFlinkDeploymentController
         canaryResourceManager = new CanaryResourceManager<>(configManager);
         flinkDeploymentController =
                 new FlinkDeploymentController(
-                        ValidatorUtils.discoverValidators(configManager),
+                        ValidatorUtils.discoverValidators(
+                                configManager,
+                                OperatorPluginUtils.createPluginManager(
+                                        configManager.getDefaultConfig())),
                         contextFactory,
                         reconcilerFactory,
                         new FlinkDeploymentObserverFactory(eventRecorder),
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
index b135fba0..e2a42710 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkSessionJobController.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReco
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.FlinkResourceEventCollector;
 import 
org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotEventCollector;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 
@@ -90,7 +91,10 @@ public class TestingFlinkSessionJobController
 
         flinkSessionJobController =
                 new FlinkSessionJobController(
-                        ValidatorUtils.discoverValidators(configManager),
+                        ValidatorUtils.discoverValidators(
+                                configManager,
+                                OperatorPluginUtils.createPluginManager(
+                                        configManager.getDefaultConfig())),
                         ctxFactory,
                         new SessionJobReconciler(
                                 eventRecorder, statusRecorder, new 
NoopJobAutoscaler<>()),
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java
index ba5ddc33..4325dacd 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/listener/ListenerUtilsTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -56,10 +57,11 @@ public class ListenerUtilsTest {
                     ConfigConstants.ENV_FLINK_PLUGINS_DIR,
                     TestUtils.getTestPluginsRootDir(temporaryFolder));
             TestUtils.setEnv(systemEnv);
+            var configManager = new 
FlinkConfigManager(Configuration.fromMap(testConfig));
+            var pluginManager =
+                    
OperatorPluginUtils.createPluginManager(configManager.getDefaultConfig());
             var listeners =
-                    new ArrayList<>(
-                            ListenerUtils.discoverListeners(
-                                    new 
FlinkConfigManager(Configuration.fromMap(testConfig))));
+                    new 
ArrayList<>(ListenerUtils.discoverListeners(configManager, pluginManager));
             assertEquals(1, listeners.size());
 
             var testingListener = (TestingListener) listeners.get(0);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/MutatorUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/MutatorUtilsTest.java
index 4b840738..ce721bbb 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/MutatorUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/MutatorUtilsTest.java
@@ -52,13 +52,15 @@ public class MutatorUtilsTest {
                     ConfigConstants.ENV_FLINK_PLUGINS_DIR,
                     TestUtils.getTestPluginsRootDir(temporaryFolder));
             TestUtils.setEnv(systemEnv);
+            var configManager = new FlinkConfigManager(new Configuration());
+            var pluginManager =
+                    
OperatorPluginUtils.createPluginManager(configManager.getDefaultConfig());
             assertEquals(
                     new HashSet<>(
                             Arrays.asList(
                                     DefaultFlinkMutator.class.getName(),
                                     TestMutator.class.getName())),
-                    MutatorUtils.discoverMutators(new FlinkConfigManager(new 
Configuration()))
-                            .stream()
+                    MutatorUtils.discoverMutators(configManager, 
pluginManager).stream()
                             .map(v -> v.getClass().getName())
                             .collect(Collectors.toSet()));
         } finally {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
index 70d8c9fd..b5959f64 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ValidatorUtilsTest.java
@@ -52,13 +52,15 @@ public class ValidatorUtilsTest {
                     ConfigConstants.ENV_FLINK_PLUGINS_DIR,
                     TestUtils.getTestPluginsRootDir(temporaryFolder));
             TestUtils.setEnv(systemEnv);
+            var configManager = new FlinkConfigManager(new Configuration());
+            var pluginManager =
+                    
OperatorPluginUtils.createPluginManager(configManager.getDefaultConfig());
             assertEquals(
                     new HashSet<>(
                             Arrays.asList(
                                     DefaultValidator.class.getName(),
                                     TestValidator.class.getName())),
-                    ValidatorUtils.discoverValidators(new 
FlinkConfigManager(new Configuration()))
-                            .stream()
+                    ValidatorUtils.discoverValidators(configManager, 
pluginManager).stream()
                             .map(v -> v.getClass().getName())
                             .collect(Collectors.toSet()));
         } finally {
diff --git 
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
 
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
index c616372f..92f0760f 100644
--- 
a/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
+++ 
b/flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/FlinkOperatorWebhook.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.admission;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
 import org.apache.flink.kubernetes.operator.api.FlinkStateSnapshot;
@@ -28,6 +29,7 @@ import 
org.apache.flink.kubernetes.operator.ssl.ReloadableSslContext;
 import org.apache.flink.kubernetes.operator.utils.EnvUtils;
 import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
 import org.apache.flink.kubernetes.operator.utils.MutatorUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 
@@ -83,8 +85,10 @@ public class FlinkOperatorWebhook {
             
informerManager.setNamespaces(operatorConfig.getWatchedNamespaces());
         }
 
-        this.validators = ValidatorUtils.discoverValidators(configManager);
-        this.mutators = MutatorUtils.discoverMutators(configManager);
+        PluginManager pluginManager =
+                
OperatorPluginUtils.createPluginManager(configManager.getDefaultConfig());
+        this.validators = ValidatorUtils.discoverValidators(configManager, 
pluginManager);
+        this.mutators = MutatorUtils.discoverMutators(configManager, 
pluginManager);
         this.admissionHandler =
                 new AdmissionHandler(
                         new FlinkValidator(validators, informerManager),
diff --git 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
index e912baf1..a51ed7c3 100644
--- 
a/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
+++ 
b/flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.admission;
 
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.admission.mutator.FlinkMutator;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
@@ -27,6 +28,7 @@ import 
org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.utils.MutatorUtils;
+import org.apache.flink.kubernetes.operator.utils.OperatorPluginUtils;
 import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -67,15 +69,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @EnableKubernetesMockClient(crud = true)
 public class AdmissionHandlerTest {
 
+    private static final FlinkConfigManager CONFIG_MANAGER = new 
FlinkConfigManager(ns -> {}, true);
+    private static final PluginManager PLUGIN_MANAGER =
+            
OperatorPluginUtils.createPluginManager(CONFIG_MANAGER.getDefaultConfig());
+
     private KubernetesClient kubernetesClient;
     private AdmissionHandler admissionHandler =
             new AdmissionHandler(
                     new FlinkValidator(
-                            ValidatorUtils.discoverValidators(
-                                    new FlinkConfigManager(ns -> {}, true)),
+                            ValidatorUtils.discoverValidators(CONFIG_MANAGER, 
PLUGIN_MANAGER),
                             new InformerManager(null)),
                     new FlinkMutator(
-                            MutatorUtils.discoverMutators(new 
FlinkConfigManager(ns -> {}, true)),
+                            MutatorUtils.discoverMutators(CONFIG_MANAGER, 
PLUGIN_MANAGER),
                             new InformerManager(kubernetesClient)));
 
     @Test
@@ -149,12 +154,10 @@ public class AdmissionHandlerTest {
         admissionHandler =
                 new AdmissionHandler(
                         new FlinkValidator(
-                                ValidatorUtils.discoverValidators(
-                                        new FlinkConfigManager(ns -> {}, 
true)),
+                                
ValidatorUtils.discoverValidators(CONFIG_MANAGER, PLUGIN_MANAGER),
                                 new InformerManager(null)),
                         new FlinkMutator(
-                                MutatorUtils.discoverMutators(
-                                        new FlinkConfigManager(ns -> {}, 
true)),
+                                MutatorUtils.discoverMutators(CONFIG_MANAGER, 
PLUGIN_MANAGER),
                                 informerManager));
 
         final EmbeddedChannel embeddedChannel = new 
EmbeddedChannel(admissionHandler);

Reply via email to