This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 850c7098fd4021d50146ecd40c26816bfab66906
Author: Alexander Fedulov <[email protected]>
AuthorDate: Sat Jun 15 16:00:56 2019 +0200

    [FLINK-16222][runtime] Introduce PluginManager to ReporterSetup
---
 .../entrypoint/MesosJobClusterEntrypoint.java      |  5 +++--
 .../entrypoint/MesosSessionClusterEntrypoint.java  |  5 +++--
 .../mesos/entrypoint/MesosTaskExecutorRunner.java  |  7 +++++--
 .../runtime/entrypoint/ClusterEntrypoint.java      | 24 ++++++++++++----------
 .../flink/runtime/metrics/ReporterSetup.java       |  5 ++++-
 .../flink/runtime/minicluster/MiniCluster.java     |  2 +-
 .../runtime/taskexecutor/TaskManagerRunner.java    | 14 +++++++------
 .../runtime/metrics/MetricRegistryImplTest.java    |  2 +-
 .../flink/runtime/metrics/ReporterSetupTest.java   | 24 +++++++++++-----------
 .../taskexecutor/TaskManagerRunnerTest.java        |  5 ++++-
 ...tractTaskManagerProcessFailureRecoveryTest.java |  5 ++++-
 .../JobManagerHAProcessFailureRecoveryITCase.java  |  5 ++++-
 .../apache/flink/yarn/YarnTaskExecutorRunner.java  |  7 +++++--
 13 files changed, 67 insertions(+), 43 deletions(-)

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 052c3aa..f51d903 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.plugin.PluginManager;
 import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
@@ -70,8 +71,8 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
        }
 
        @Override
-       protected void initializeServices(Configuration config) throws 
Exception {
-               super.initializeServices(config);
+       protected void initializeServices(Configuration config, PluginManager 
pluginManager) throws Exception {
+               super.initializeServices(config, pluginManager);
 
                final String hostname = 
config.getString(JobManagerOptions.ADDRESS);
 
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index d722356..6c6c64a 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -20,6 +20,7 @@ package org.apache.flink.mesos.entrypoint;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.plugin.PluginManager;
 import 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory;
 import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices;
 import 
org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils;
@@ -67,8 +68,8 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
        }
 
        @Override
-       protected void initializeServices(Configuration config) throws 
Exception {
-               super.initializeServices(config);
+       protected void initializeServices(Configuration config, PluginManager 
pluginManager) throws Exception {
+               super.initializeServices(config, pluginManager);
 
                final String hostname = 
config.getString(JobManagerOptions.ADDRESS);
 
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
index d87ab7b..92f4c13 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
@@ -21,6 +21,7 @@ package org.apache.flink.mesos.entrypoint;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
 import org.apache.flink.mesos.util.MesosUtils;
@@ -86,8 +87,10 @@ public class MesosTaskExecutorRunner {
 
                final Map<String, String> envs = System.getenv();
 
+               final PluginManager pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(configuration);
+
                // configure the filesystems
-               FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
+               FileSystem.initialize(configuration, pluginManager);
 
                // tell akka to die in case of an error
                configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, 
true);
@@ -103,7 +106,7 @@ public class MesosTaskExecutorRunner {
 
                try {
                        SecurityUtils.getInstalledContext().runSecured(() -> {
-                               TaskManagerRunner.runTaskManager(configuration, 
resourceId);
+                               TaskManagerRunner.runTaskManager(configuration, 
resourceId, pluginManager);
 
                                return 0;
                        });
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 59d1eb6..8566ccd 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -30,6 +30,7 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -159,13 +160,13 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                LOG.info("Starting {}.", getClass().getSimpleName());
 
                try {
-
-                       configureFileSystems(configuration);
+                       PluginManager pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(configuration);
+                       configureFileSystems(configuration, pluginManager);
 
                        SecurityContext securityContext = 
installSecurityContext(configuration);
 
                        securityContext.runSecured((Callable<Void>) () -> {
-                               runCluster(configuration);
+                               runCluster(configuration, pluginManager);
 
                                return null;
                        });
@@ -188,9 +189,9 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                }
        }
 
-       private void configureFileSystems(Configuration configuration) {
+       private void configureFileSystems(Configuration configuration, 
PluginManager pluginManager) {
                LOG.info("Install default filesystem.");
-               FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
+               FileSystem.initialize(configuration, pluginManager);
        }
 
        private SecurityContext installSecurityContext(Configuration 
configuration) throws Exception {
@@ -201,9 +202,10 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                return SecurityUtils.getInstalledContext();
        }
 
-       private void runCluster(Configuration configuration) throws Exception {
+       private void runCluster(Configuration configuration, PluginManager 
pluginManager) throws Exception {
                synchronized (lock) {
-                       initializeServices(configuration);
+
+                       initializeServices(configuration, pluginManager);
 
                        // write host information into configuration
                        configuration.setString(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
@@ -242,7 +244,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                }
        }
 
-       protected void initializeServices(Configuration configuration) throws 
Exception {
+       protected void initializeServices(Configuration configuration, 
PluginManager pluginManager) throws Exception {
 
                LOG.info("Initializing cluster services.");
 
@@ -265,7 +267,7 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                        blobServer = new BlobServer(configuration, 
haServices.createBlobStore());
                        blobServer.start();
                        heartbeatServices = 
createHeartbeatServices(configuration);
-                       metricRegistry = createMetricRegistry(configuration);
+                       metricRegistry = createMetricRegistry(configuration, 
pluginManager);
 
                        final RpcService metricQueryServiceRpcService = 
MetricUtils.startMetricsRpcService(configuration, 
commonRpcService.getAddress());
                        
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
@@ -308,10 +310,10 @@ public abstract class ClusterEntrypoint implements 
AutoCloseableAsync, FatalErro
                return HeartbeatServices.fromConfiguration(configuration);
        }
 
-       protected MetricRegistryImpl createMetricRegistry(Configuration 
configuration) {
+       protected MetricRegistryImpl createMetricRegistry(Configuration 
configuration, PluginManager pluginManager) {
                return new MetricRegistryImpl(
                        
MetricRegistryConfiguration.fromConfiguration(configuration),
-                       ReporterSetup.fromConfiguration(configuration));
+                       ReporterSetup.fromConfiguration(configuration, 
pluginManager));
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
index 62cb543..4fdc68a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.InstantiateViaFactory;
 import org.apache.flink.metrics.reporter.MetricReporter;
@@ -32,6 +33,8 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -125,7 +128,7 @@ public final class ReporterSetup {
                return new ReporterSetup(reporterName, metricConfig, reporter);
        }
 
-       public static List<ReporterSetup> fromConfiguration(final Configuration 
configuration) {
+       public static List<ReporterSetup> fromConfiguration(final Configuration 
configuration, @Nullable final PluginManager pluginManager) {
                String includedReportersString = 
configuration.getString(MetricOptions.REPORTERS_LIST, "");
 
                Set<String> namedReporters = 
findEnabledReportersInConfiguration(configuration, includedReportersString);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 41e3bdf..4f4316f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -721,7 +721,7 @@ public class MiniCluster implements JobExecutorService, 
AutoCloseableAsync {
        protected MetricRegistryImpl createMetricRegistry(Configuration config) 
{
                return new MetricRegistryImpl(
                        MetricRegistryConfiguration.fromConfiguration(config),
-                       ReporterSetup.fromConfiguration(config));
+                       ReporterSetup.fromConfiguration(config, null));
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 0478953..6fbd874 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -118,7 +119,7 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
 
        private boolean shutdown;
 
-       public TaskManagerRunner(Configuration configuration, ResourceID 
resourceId) throws Exception {
+       public TaskManagerRunner(Configuration configuration, ResourceID 
resourceId, PluginManager pluginManager) throws Exception {
                this.configuration = checkNotNull(configuration);
                this.resourceId = checkNotNull(resourceId);
 
@@ -139,7 +140,7 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
 
                metricRegistry = new MetricRegistryImpl(
                        
MetricRegistryConfiguration.fromConfiguration(configuration),
-                       ReporterSetup.fromConfiguration(configuration));
+                       ReporterSetup.fromConfiguration(configuration, 
pluginManager));
 
                final RpcService metricQueryServiceRpcService = 
MetricUtils.startMetricsRpcService(configuration, rpcService.getAddress());
                metricRegistry.startQueryService(metricQueryServiceRpcService, 
resourceId);
@@ -307,8 +308,8 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                return 
GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), 
dynamicProperties);
        }
 
-       public static void runTaskManager(Configuration configuration, 
ResourceID resourceId) throws Exception {
-               final TaskManagerRunner taskManagerRunner = new 
TaskManagerRunner(configuration, resourceId);
+       public static void runTaskManager(Configuration configuration, 
ResourceID resourceId, PluginManager pluginManager) throws Exception {
+               final TaskManagerRunner taskManagerRunner = new 
TaskManagerRunner(configuration, resourceId, pluginManager);
 
                taskManagerRunner.start();
        }
@@ -317,12 +318,13 @@ public class TaskManagerRunner implements 
FatalErrorHandler, AutoCloseableAsync
                try {
                        final Configuration configuration = 
loadConfiguration(args);
 
-                       FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
+                       final PluginManager pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(configuration);
+                       FileSystem.initialize(configuration, pluginManager);
 
                        SecurityUtils.install(new 
SecurityConfiguration(configuration));
 
                        SecurityUtils.getInstalledContext().runSecured(() -> {
-                               runTaskManager(configuration, resourceID);
+                               runTaskManager(configuration, resourceID, 
pluginManager);
                                return null;
                        });
                } catch (Throwable t) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
index 0f69aca..ed9d913 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
@@ -280,7 +280,7 @@ public class MetricRegistryImplTest extends TestLogger {
                config.setString(MetricOptions.SCOPE_DELIMITER, "_");
                config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E");
 
-               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config), 
ReporterSetup.fromConfiguration(config));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config), 
ReporterSetup.fromConfiguration(config, null));
 
                TaskManagerMetricGroup tmGroup = new 
TaskManagerMetricGroup(registry, "host", "id");
                assertEquals("A_B_C_D_E_name", 
tmGroup.getMetricIdentifier("name"));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
index bd1715b..f687c66 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
@@ -68,7 +68,7 @@ public class ReporterSetupTest extends TestLogger {
 
                configureReporter1(config);
 
-               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config);
+               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config, null);
 
                Assert.assertEquals(1, reporterSetups.size());
 
@@ -86,7 +86,7 @@ public class ReporterSetupTest extends TestLogger {
                configureReporter1(config);
                configureReporter2(config);
 
-               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config);
+               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config, null);
 
                Assert.assertEquals(2, reporterSetups.size());
 
@@ -117,7 +117,7 @@ public class ReporterSetupTest extends TestLogger {
 
                config.setString(MetricOptions.REPORTERS_LIST, "reporter2");
 
-               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config);
+               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config, null);
 
                Assert.assertEquals(1, reporterSetups.size());
 
@@ -131,7 +131,7 @@ public class ReporterSetupTest extends TestLogger {
 
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
 
-               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config);
+               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config, null);
 
                Assert.assertEquals(1, reporterSetups.size());
 
@@ -151,7 +151,7 @@ public class ReporterSetupTest extends TestLogger {
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter12.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter13.class.getName());
 
-               List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config);
+               List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config, null);
 
                assertEquals(3, reporterSetups.size());
 
@@ -230,7 +230,7 @@ public class ReporterSetupTest extends TestLogger {
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
TestReporterFactory.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_EXCLUDED_VARIABLES, 
excludedVariable1 + ";" + excludedVariable2);
 
-               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config);
+               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config, null);
 
                assertEquals(1, reporterSetups.size());
 
@@ -247,7 +247,7 @@ public class ReporterSetupTest extends TestLogger {
                final Configuration config = new Configuration();
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
TestReporterFactory.class.getName());
 
-               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config);
+               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config, null);
 
                assertEquals(1, reporterSetups.size());
 
@@ -265,7 +265,7 @@ public class ReporterSetupTest extends TestLogger {
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
InstantiationTypeTrackingTestReporterFactory.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
InstantiationTypeTrackingTestReporter.class.getName());
 
-               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config);
+               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config, null);
 
                assertEquals(1, reporterSetups.size());
 
@@ -284,7 +284,7 @@ public class ReporterSetupTest extends TestLogger {
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
TestReporterFactory.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"fail." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
FailingFactory.class.getName());
 
-               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config);
+               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config, null);
 
                assertEquals(1, reporterSetups.size());
        }
@@ -298,7 +298,7 @@ public class ReporterSetupTest extends TestLogger {
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
InstantiationTypeTrackingTestReporterFactory.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
InstantiationTypeTrackingTestReporter.class.getName());
 
-               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config);
+               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config, null);
 
                assertEquals(2, reporterSetups.size());
 
@@ -317,7 +317,7 @@ public class ReporterSetupTest extends TestLogger {
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, 
ConfigExposingReporterFactory.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg", "hello");
 
-               ReporterSetup.fromConfiguration(config);
+               ReporterSetup.fromConfiguration(config, null);
 
                Properties passedConfig = 
ConfigExposingReporterFactory.lastConfig;
                assertEquals("hello", passedConfig.getProperty("arg"));
@@ -331,7 +331,7 @@ public class ReporterSetupTest extends TestLogger {
                final Configuration config = new Configuration();
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
InstantiationTypeTrackingTestReporter2.class.getName());
 
-               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config);
+               final List<ReporterSetup> reporterSetups = 
ReporterSetup.fromConfiguration(config, null);
 
                assertEquals(1, reporterSetups.size());
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
index 3ce10f6..2af1319 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager;
 import org.apache.flink.util.TestLogger;
@@ -92,7 +94,8 @@ public class TaskManagerRunnerTest extends TestLogger {
        }
 
        private static TaskManagerRunner createTaskManagerRunner(final 
Configuration configuration) throws Exception {
-               TaskManagerRunner taskManagerRunner = new 
TaskManagerRunner(configuration, ResourceID.generate());
+               final PluginManager pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(configuration);
+               TaskManagerRunner taskManagerRunner = new 
TaskManagerRunner(configuration, ResourceID.generate(), pluginManager);
                taskManagerRunner.start();
                return taskManagerRunner;
        }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index d9c8230..f71d193 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
@@ -321,8 +323,9 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
                        try {
                                final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
                                Configuration cfg = 
parameterTool.getConfiguration();
+                               final PluginManager pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(cfg);
 
-                               TaskManagerRunner.runTaskManager(cfg, 
ResourceID.generate());
+                               TaskManagerRunner.runTaskManager(cfg, 
ResourceID.generate(), pluginManager);
                        }
                        catch (Throwable t) {
                                LOG.error("Failed to start TaskManager 
process", t);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
index 8400bb4..325ed19 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
@@ -32,6 +32,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
@@ -269,9 +271,10 @@ public class JobManagerHAProcessFailureRecoveryITCase 
extends TestLogger {
                                config,
                                TestingUtils.defaultExecutor());
 
+                       final PluginManager pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(config);
                        // Start the task manager process
                        for (int i = 0; i < numberOfTaskManagers; i++) {
-                               taskManagerRunners[i] = new 
TaskManagerRunner(config, ResourceID.generate());
+                               taskManagerRunners[i] = new 
TaskManagerRunner(config, ResourceID.generate(), pluginManager);
                                taskManagerRunners[i].start();
                        }
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index bf28974..78b4187 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -92,7 +93,9 @@ public class YarnTaskExecutorRunner {
 
                        final Configuration configuration = 
TaskManagerRunner.loadConfiguration(args);
 
-                       FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
+                       final PluginManager pluginManager = 
PluginUtils.createPluginManagerFromRootFolder(configuration);
+
+                       FileSystem.initialize(configuration, pluginManager);
 
                        
setupConfigurationAndInstallSecurityContext(configuration, currDir, ENV);
 
@@ -101,7 +104,7 @@ public class YarnTaskExecutorRunner {
                                "ContainerId variable %s not set", 
YarnResourceManager.ENV_FLINK_CONTAINER_ID);
 
                        
SecurityUtils.getInstalledContext().runSecured((Callable<Void>) () -> {
-                               TaskManagerRunner.runTaskManager(configuration, 
new ResourceID(containerId));
+                               TaskManagerRunner.runTaskManager(configuration, 
new ResourceID(containerId), pluginManager);
                                return null;
                        });
                }

Reply via email to