This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 850c7098fd4021d50146ecd40c26816bfab66906 Author: Alexander Fedulov <[email protected]> AuthorDate: Sat Jun 15 16:00:56 2019 +0200 [FLINK-16222][runtime] Introduce PluginManager to ReporterSetup --- .../entrypoint/MesosJobClusterEntrypoint.java | 5 +++-- .../entrypoint/MesosSessionClusterEntrypoint.java | 5 +++-- .../mesos/entrypoint/MesosTaskExecutorRunner.java | 7 +++++-- .../runtime/entrypoint/ClusterEntrypoint.java | 24 ++++++++++++---------- .../flink/runtime/metrics/ReporterSetup.java | 5 ++++- .../flink/runtime/minicluster/MiniCluster.java | 2 +- .../runtime/taskexecutor/TaskManagerRunner.java | 14 +++++++------ .../runtime/metrics/MetricRegistryImplTest.java | 2 +- .../flink/runtime/metrics/ReporterSetupTest.java | 24 +++++++++++----------- .../taskexecutor/TaskManagerRunnerTest.java | 5 ++++- ...tractTaskManagerProcessFailureRecoveryTest.java | 5 ++++- .../JobManagerHAProcessFailureRecoveryITCase.java | 5 ++++- .../apache/flink/yarn/YarnTaskExecutorRunner.java | 7 +++++-- 13 files changed, 67 insertions(+), 43 deletions(-) diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java index 052c3aa..f51d903 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java @@ -20,6 +20,7 @@ package org.apache.flink.mesos.entrypoint; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory; import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils; @@ -70,8 +71,8 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { } @Override - protected void initializeServices(Configuration config) throws Exception { - super.initializeServices(config); + protected void initializeServices(Configuration config, PluginManager pluginManager) throws Exception { + super.initializeServices(config, pluginManager); final String hostname = config.getString(JobManagerOptions.ADDRESS); diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java index d722356..6c6c64a 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java @@ -20,6 +20,7 @@ package org.apache.flink.mesos.entrypoint; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManagerFactory; import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils; @@ -67,8 +68,8 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint { } @Override - protected void initializeServices(Configuration config) throws Exception { - super.initializeServices(config); + protected void initializeServices(Configuration config, PluginManager pluginManager) throws Exception { + super.initializeServices(config, pluginManager); final String hostname = config.getString(JobManagerOptions.ADDRESS); diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java index d87ab7b..92f4c13 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java @@ -21,6 +21,7 @@ package org.apache.flink.mesos.entrypoint; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys; import org.apache.flink.mesos.util.MesosUtils; @@ -86,8 +87,10 @@ public class MesosTaskExecutorRunner { final Map<String, String> envs = System.getenv(); + final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); + // configure the filesystems - FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); + FileSystem.initialize(configuration, pluginManager); // tell akka to die in case of an error configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); @@ -103,7 +106,7 @@ public class MesosTaskExecutorRunner { try { SecurityUtils.getInstalledContext().runSecured(() -> { - TaskManagerRunner.runTaskManager(configuration, resourceId); + TaskManagerRunner.runTaskManager(configuration, resourceId, pluginManager); return 0; }); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index 59d1eb6..8566ccd 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -30,6 +30,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -159,13 +160,13 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro LOG.info("Starting {}.", getClass().getSimpleName()); try { - - configureFileSystems(configuration); + PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); + configureFileSystems(configuration, pluginManager); SecurityContext securityContext = installSecurityContext(configuration); securityContext.runSecured((Callable<Void>) () -> { - runCluster(configuration); + runCluster(configuration, pluginManager); return null; }); @@ -188,9 +189,9 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro } } - private void configureFileSystems(Configuration configuration) { + private void configureFileSystems(Configuration configuration, PluginManager pluginManager) { LOG.info("Install default filesystem."); - FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); + FileSystem.initialize(configuration, pluginManager); } private SecurityContext installSecurityContext(Configuration configuration) throws Exception { @@ -201,9 +202,10 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro return SecurityUtils.getInstalledContext(); } - private void runCluster(Configuration configuration) throws Exception { + private void runCluster(Configuration configuration, PluginManager pluginManager) throws Exception { synchronized (lock) { - initializeServices(configuration); + + initializeServices(configuration, pluginManager); // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); @@ -242,7 +244,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro } } - protected void initializeServices(Configuration configuration) throws Exception { + protected void initializeServices(Configuration configuration, PluginManager pluginManager) throws Exception { LOG.info("Initializing cluster services."); @@ -265,7 +267,7 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); heartbeatServices = createHeartbeatServices(configuration); - metricRegistry = createMetricRegistry(configuration); + metricRegistry = createMetricRegistry(configuration, pluginManager); final RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(configuration, commonRpcService.getAddress()); metricRegistry.startQueryService(metricQueryServiceRpcService, null); @@ -308,10 +310,10 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro return HeartbeatServices.fromConfiguration(configuration); } - protected MetricRegistryImpl createMetricRegistry(Configuration configuration) { + protected MetricRegistryImpl createMetricRegistry(Configuration configuration, PluginManager pluginManager) { return new MetricRegistryImpl( MetricRegistryConfiguration.fromConfiguration(configuration), - ReporterSetup.fromConfiguration(configuration)); + ReporterSetup.fromConfiguration(configuration, pluginManager)); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java index 62cb543..4fdc68a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.InstantiateViaFactory; import org.apache.flink.metrics.reporter.MetricReporter; @@ -32,6 +33,8 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -125,7 +128,7 @@ public final class ReporterSetup { return new ReporterSetup(reporterName, metricConfig, reporter); } - public static List<ReporterSetup> fromConfiguration(final Configuration configuration) { + public static List<ReporterSetup> fromConfiguration(final Configuration configuration, @Nullable final PluginManager pluginManager) { String includedReportersString = configuration.getString(MetricOptions.REPORTERS_LIST, ""); Set<String> namedReporters = findEnabledReportersInConfiguration(configuration, includedReportersString); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 41e3bdf..4f4316f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -721,7 +721,7 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync { protected MetricRegistryImpl createMetricRegistry(Configuration config) { return new MetricRegistryImpl( MetricRegistryConfiguration.fromConfiguration(config), - ReporterSetup.fromConfiguration(config)); + ReporterSetup.fromConfiguration(config, null)); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 0478953..6fbd874 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.akka.AkkaUtils; @@ -118,7 +119,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync private boolean shutdown; - public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception { + public TaskManagerRunner(Configuration configuration, ResourceID resourceId, PluginManager pluginManager) throws Exception { this.configuration = checkNotNull(configuration); this.resourceId = checkNotNull(resourceId); @@ -139,7 +140,7 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync metricRegistry = new MetricRegistryImpl( MetricRegistryConfiguration.fromConfiguration(configuration), - ReporterSetup.fromConfiguration(configuration)); + ReporterSetup.fromConfiguration(configuration, pluginManager)); final RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(configuration, rpcService.getAddress()); metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId); @@ -307,8 +308,8 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir(), dynamicProperties); } - public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception { - final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId); + public static void runTaskManager(Configuration configuration, ResourceID resourceId, PluginManager pluginManager) throws Exception { + final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId, pluginManager); taskManagerRunner.start(); } @@ -317,12 +318,13 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync try { final Configuration configuration = loadConfiguration(args); - FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); + final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); + FileSystem.initialize(configuration, pluginManager); SecurityUtils.install(new SecurityConfiguration(configuration)); SecurityUtils.getInstalledContext().runSecured(() -> { - runTaskManager(configuration, resourceID); + runTaskManager(configuration, resourceID, pluginManager); return null; }); } catch (Throwable t) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java index 0f69aca..ed9d913 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java @@ -280,7 +280,7 @@ public class MetricRegistryImplTest extends TestLogger { config.setString(MetricOptions.SCOPE_DELIMITER, "_"); config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E"); - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config), ReporterSetup.fromConfiguration(config)); + MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config), ReporterSetup.fromConfiguration(config, null)); TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id"); assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name")); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java index bd1715b..f687c66 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java @@ -68,7 +68,7 @@ public class ReporterSetupTest extends TestLogger { configureReporter1(config); - final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config); + final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null); Assert.assertEquals(1, reporterSetups.size()); @@ -86,7 +86,7 @@ public class ReporterSetupTest extends TestLogger { configureReporter1(config); configureReporter2(config); - final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config); + final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null); Assert.assertEquals(2, reporterSetups.size()); @@ -117,7 +117,7 @@ public class ReporterSetupTest extends TestLogger { config.setString(MetricOptions.REPORTERS_LIST, "reporter2"); - final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config); + final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null); Assert.assertEquals(1, reporterSetups.size()); @@ -131,7 +131,7 @@ public class ReporterSetupTest extends TestLogger { config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "reporter1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); - final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config); + final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null); Assert.assertEquals(1, reporterSetups.size()); @@ -151,7 +151,7 @@ public class ReporterSetupTest extends TestLogger { config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName()); - List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config); + List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null); assertEquals(3, reporterSetups.size()); @@ -230,7 +230,7 @@ public class ReporterSetupTest extends TestLogger { config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, TestReporterFactory.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_EXCLUDED_VARIABLES, excludedVariable1 + ";" + excludedVariable2); - final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config); + final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null); assertEquals(1, reporterSetups.size()); @@ -247,7 +247,7 @@ public class ReporterSetupTest extends TestLogger { final Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, TestReporterFactory.class.getName()); - final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config); + final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null); assertEquals(1, reporterSetups.size()); @@ -265,7 +265,7 @@ public class ReporterSetupTest extends TestLogger { config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, InstantiationTypeTrackingTestReporterFactory.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter.class.getName()); - final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config); + final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null); assertEquals(1, reporterSetups.size()); @@ -284,7 +284,7 @@ public class ReporterSetupTest extends TestLogger { config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, TestReporterFactory.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "fail." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, FailingFactory.class.getName()); - final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config); + final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null); assertEquals(1, reporterSetups.size()); } @@ -298,7 +298,7 @@ public class ReporterSetupTest extends TestLogger { config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, InstantiationTypeTrackingTestReporterFactory.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter.class.getName()); - final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config); + final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null); assertEquals(2, reporterSetups.size()); @@ -317,7 +317,7 @@ public class ReporterSetupTest extends TestLogger { config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, ConfigExposingReporterFactory.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg", "hello"); - ReporterSetup.fromConfiguration(config); + ReporterSetup.fromConfiguration(config, null); Properties passedConfig = ConfigExposingReporterFactory.lastConfig; assertEquals("hello", passedConfig.getProperty("arg")); @@ -331,7 +331,7 @@ public class ReporterSetupTest extends TestLogger { final Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter2.class.getName()); - final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config); + final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null); assertEquals(1, reporterSetups.size()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java index 3ce10f6..2af1319 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerTest.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.testutils.SystemExitTrackingSecurityManager; import org.apache.flink.util.TestLogger; @@ -92,7 +94,8 @@ public class TaskManagerRunnerTest extends TestLogger { } private static TaskManagerRunner createTaskManagerRunner(final Configuration configuration) throws Exception { - TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, ResourceID.generate()); + final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); + TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, ResourceID.generate(), pluginManager); taskManagerRunner.start(); return taskManagerRunner; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index d9c8230..f71d193 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -27,6 +27,8 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint; import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; @@ -321,8 +323,9 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test try { final ParameterTool parameterTool = ParameterTool.fromArgs(args); Configuration cfg = parameterTool.getConfiguration(); + final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(cfg); - TaskManagerRunner.runTaskManager(cfg, ResourceID.generate()); + TaskManagerRunner.runTaskManager(cfg, ResourceID.generate(), pluginManager); } catch (Throwable t) { LOG.error("Failed to start TaskManager process", t); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java index 8400bb4..325ed19 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java @@ -32,6 +32,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; @@ -269,9 +271,10 @@ public class JobManagerHAProcessFailureRecoveryITCase extends TestLogger { config, TestingUtils.defaultExecutor()); + final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(config); // Start the task manager process for (int i = 0; i < numberOfTaskManagers; i++) { - taskManagerRunners[i] = new TaskManagerRunner(config, ResourceID.generate()); + taskManagerRunners[i] = new TaskManagerRunner(config, ResourceID.generate(), pluginManager); taskManagerRunners[i].start(); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java index bf28974..78b4187 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -92,7 +93,9 @@ public class YarnTaskExecutorRunner { final Configuration configuration = TaskManagerRunner.loadConfiguration(args); - FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration)); + final PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration); + + FileSystem.initialize(configuration, pluginManager); setupConfigurationAndInstallSecurityContext(configuration, currDir, ENV); @@ -101,7 +104,7 @@ public class YarnTaskExecutorRunner { "ContainerId variable %s not set", YarnResourceManager.ENV_FLINK_CONTAINER_ID); SecurityUtils.getInstalledContext().runSecured((Callable<Void>) () -> { - TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId)); + TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId), pluginManager); return null; }); }
