This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new e5e6093 YARN-9140. Code cleanup in ResourcePluginManager.initialize
and in TestResourcePluginManager. Contributed by Peter Bacsko
e5e6093 is described below
commit e5e609384f68cc45b0c2bfbde0a49426c90017d3
Author: Szilard Nemeth <[email protected]>
AuthorDate: Wed Aug 14 16:58:22 2019 +0200
YARN-9140. Code cleanup in ResourcePluginManager.initialize and in
TestResourcePluginManager. Contributed by Peter Bacsko
---
.../resourceplugin/ResourcePluginManager.java | 84 +++++++++++++--------
.../resourceplugin/TestResourcePluginManager.java | 86 ++++++++++------------
2 files changed, 91 insertions(+), 79 deletions(-)
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
index 1274b64..84cdd7a 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/ResourcePluginManager.java
@@ -20,6 +20,7 @@ package
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugi
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
@@ -44,7 +45,6 @@ import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -68,8 +68,29 @@ public class ResourcePluginManager {
public void initialize(Context context)
throws YarnException, ClassNotFoundException {
Configuration conf = context.getConf();
- Map<String, ResourcePlugin> pluginMap = new HashMap<>();
+ String[] plugins = getPluginsFromConfig(conf);
+ Map<String, ResourcePlugin> pluginMap = Maps.newHashMap();
+ if (plugins != null) {
+ pluginMap = initializePlugins(context, plugins);
+ }
+
+ // Try to load pluggable device plugins
+ boolean pluggableDeviceFrameworkEnabled = conf.getBoolean(
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
+ YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
+
+ if (pluggableDeviceFrameworkEnabled) {
+ initializePluggableDevicePlugins(context, conf, pluginMap);
+ } else {
+ LOG.info("The pluggable device framework is not enabled."
+ + " If you want, please set true to {}",
+ YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
+ }
+ configuredPlugins = Collections.unmodifiableMap(pluginMap);
+ }
+
+ private String[] getPluginsFromConfig(Configuration conf) {
String[] plugins = conf.getStrings(YarnConfiguration.NM_RESOURCE_PLUGINS);
if (plugins == null || plugins.length == 0) {
LOG.info("No Resource plugins found from configuration!");
@@ -77,25 +98,18 @@ public class ResourcePluginManager {
LOG.info("Found Resource plugins from configuration: "
+ Arrays.toString(plugins));
- if (plugins != null) {
- // Initialize each plugins
- for (String resourceName : plugins) {
- resourceName = resourceName.trim();
- if (!SUPPORTED_RESOURCE_PLUGINS.contains(resourceName)) {
- String msg =
- "Trying to initialize resource plugin with name=" + resourceName
- + ", it is not supported, list of supported plugins:"
- + StringUtils.join(",", SUPPORTED_RESOURCE_PLUGINS);
- LOG.error(msg);
- throw new YarnException(msg);
- }
+ return plugins;
+ }
- if (pluginMap.containsKey(resourceName)) {
- LOG.warn("Ignoring duplicate Resource plugin definition: " +
- resourceName);
- continue;
- }
+ private Map<String, ResourcePlugin> initializePlugins(
+ Context context, String[] plugins) throws YarnException {
+ Map<String, ResourcePlugin> pluginMap = Maps.newHashMap();
+ for (String resourceName : plugins) {
+ resourceName = resourceName.trim();
+ ensurePluginIsSupported(resourceName);
+
+ if (!isPluginDuplicate(pluginMap, resourceName)) {
ResourcePlugin plugin = null;
if (resourceName.equals(GPU_URI)) {
final GpuDiscoverer gpuDiscoverer = new GpuDiscoverer();
@@ -116,19 +130,29 @@ public class ResourcePluginManager {
pluginMap.put(resourceName, plugin);
}
}
- // Try to load pluggable device plugins
- boolean puggableDeviceFrameworkEnabled = conf.getBoolean(
- YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
- YarnConfiguration.DEFAULT_NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
+ return pluginMap;
+ }
- if (puggableDeviceFrameworkEnabled) {
- initializePluggableDevicePlugins(context, conf, pluginMap);
- } else {
- LOG.info("The pluggable device framework is not enabled."
- + " If you want, please set true to {}",
- YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED);
+ private void ensurePluginIsSupported(String resourceName)
+ throws YarnException {
+ if (!SUPPORTED_RESOURCE_PLUGINS.contains(resourceName)) {
+ String msg =
+ "Trying to initialize resource plugin with name=" + resourceName
+ + ", it is not supported, list of supported plugins:"
+ + StringUtils.join(",", SUPPORTED_RESOURCE_PLUGINS);
+ LOG.error(msg);
+ throw new YarnException(msg);
}
- configuredPlugins = Collections.unmodifiableMap(pluginMap);
+ }
+
+ private boolean isPluginDuplicate(Map<String, ResourcePlugin> pluginMap,
+ String resourceName) {
+ if (pluginMap.containsKey(resourceName)) {
+ LOG.warn("Ignoring duplicate Resource plugin definition: " +
+ resourceName);
+ return true;
+ }
+ return false;
}
public void initializePluggableDevicePlugins(Context context,
diff --git
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java
index 87ce575..a41edba 100644
---
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java
+++
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/TestResourcePluginManager.java
@@ -43,7 +43,6 @@ import
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.privileg
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
-import
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerException;
import
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.*;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -59,6 +58,7 @@ import java.util.Map;
import java.io.File;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -124,38 +124,33 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
private class CustomizedResourceHandler implements ResourceHandler {
@Override
- public List<PrivilegedOperation> bootstrap(Configuration configuration)
- throws ResourceHandlerException {
+ public List<PrivilegedOperation> bootstrap(Configuration configuration) {
return null;
}
@Override
- public List<PrivilegedOperation> preStart(Container container)
- throws ResourceHandlerException {
+ public List<PrivilegedOperation> preStart(Container container) {
return null;
}
@Override
- public List<PrivilegedOperation> reacquireContainer(ContainerId
containerId)
- throws ResourceHandlerException {
+ public List<PrivilegedOperation> reacquireContainer(
+ ContainerId containerId) {
return null;
}
@Override
- public List<PrivilegedOperation> updateContainer(Container container)
- throws ResourceHandlerException {
+ public List<PrivilegedOperation> updateContainer(Container container) {
return null;
}
@Override
- public List<PrivilegedOperation> postComplete(ContainerId containerId)
- throws ResourceHandlerException {
+ public List<PrivilegedOperation> postComplete(ContainerId containerId) {
return null;
}
@Override
- public List<PrivilegedOperation> teardown()
- throws ResourceHandlerException {
+ public List<PrivilegedOperation> teardown() {
return null;
}
}
@@ -180,9 +175,9 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager,
- LocalDirsHandlerService diskhandler) {
+ LocalDirsHandlerService dirsHandler) {
return new MyContainerManager(context, exec, del, nodeStatusUpdater,
- metrics, diskhandler);
+ metrics, dirsHandler);
}
@Override
@@ -210,7 +205,7 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
nm = new MyMockNM(rpm);
nm.init(conf);
- verify(rpm, times(1)).initialize(
+ verify(rpm).initialize(
any(Context.class));
}
@@ -231,7 +226,7 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
rpm.getNameToPlugins().get("resource1")
.getNodeResourceHandlerInstance();
- verify(nodeResourceUpdaterPlugin, times(1))
+ verify(nodeResourceUpdaterPlugin)
.updateConfiguredResource(any(Resource.class));
}
@@ -239,8 +234,7 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
* Make sure ResourcePluginManager is used to initialize ResourceHandlerChain
*/
@Test(timeout = 30000)
- public void testLinuxContainerExecutorWithResourcePluginsEnabled()
- throws Exception {
+ public void testLinuxContainerExecutorWithResourcePluginsEnabled() {
final ResourcePluginManager rpm = stubResourcePluginmanager();
final LinuxContainerExecutor lce = new MyLCE();
@@ -249,8 +243,8 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
((NMContext)context).setResourcePluginManager(rpm);
- return new BaseNodeStatusUpdaterForTest(context, dispatcher,
healthChecker,
- metrics, new BaseResourceTrackerForTest());
+ return new BaseNodeStatusUpdaterForTest(context, dispatcher,
+ healthChecker, metrics, new BaseResourceTrackerForTest());
}
@Override
@@ -258,9 +252,9 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater,
ApplicationACLsManager aclsManager,
- LocalDirsHandlerService diskhandler) {
+ LocalDirsHandlerService dirsHandler) {
return new MyContainerManager(context, exec, del, nodeStatusUpdater,
- metrics, diskhandler);
+ metrics, dirsHandler);
}
@Override
@@ -308,10 +302,10 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
false);
nm.init(conf);
nm.start();
- verify(rpmSpy, times(1)).initialize(
+ verify(rpmSpy).initialize(
any(Context.class));
verify(rpmSpy, times(0)).initializePluggableDevicePlugins(
- any(Context.class), any(Configuration.class), any(Map.class));
+ any(Context.class), any(Configuration.class), anyMap());
}
// No related configuration set.
@@ -325,10 +319,10 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
nm.init(conf);
nm.start();
- verify(rpmSpy, times(1)).initialize(
+ verify(rpmSpy).initialize(
any(Context.class));
verify(rpmSpy, times(0)).initializePluggableDevicePlugins(
- any(Context.class), any(Configuration.class), any(Map.class));
+ any(Context.class), any(Configuration.class), anyMap());
}
// Enable framework and configure pluggable device classes
@@ -347,10 +341,10 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
FakeTestDevicePlugin1.class.getCanonicalName());
nm.init(conf);
nm.start();
- verify(rpmSpy, times(1)).initialize(
+ verify(rpmSpy).initialize(
any(Context.class));
- verify(rpmSpy, times(1)).initializePluggableDevicePlugins(
- any(Context.class), any(Configuration.class), any(Map.class));
+ verify(rpmSpy).initializePluggableDevicePlugins(
+ any(Context.class), any(Configuration.class), anyMap());
}
// Enable pluggable framework, but leave device classes un-configured
@@ -362,7 +356,7 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
ResourcePluginManager rpmSpy = spy(rpm);
nm = new MyMockNM(rpmSpy);
- Boolean fail = false;
+ boolean fail = false;
try {
conf.setBoolean(YarnConfiguration.NM_PLUGGABLE_DEVICE_FRAMEWORK_ENABLED,
true);
@@ -371,18 +365,16 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
nm.start();
} catch (YarnRuntimeException e) {
fail = true;
- } catch (Exception e) {
+ } catch (Exception ignored) {
}
- verify(rpmSpy, times(1)).initializePluggableDevicePlugins(
- any(Context.class), any(Configuration.class), any(Map.class));
+ verify(rpmSpy).initializePluggableDevicePlugins(
+ any(Context.class), any(Configuration.class), anyMap());
Assert.assertTrue(fail);
}
@Test(timeout = 30000)
- public void testNormalInitializationOfPluggableDeviceClasses()
- throws Exception {
-
+ public void testNormalInitializationOfPluggableDeviceClasses() {
ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm);
@@ -399,16 +391,15 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
Assert.assertEquals(1, pluginMap.size());
ResourcePlugin rp = pluginMap.get("cmpA.com/hdwA");
if (!(rp instanceof DevicePluginAdapter)) {
- Assert.assertTrue(false);
+ Assert.fail();
}
- verify(rpmSpy, times(1)).checkInterfaceCompatibility(
+ verify(rpmSpy).checkInterfaceCompatibility(
DevicePlugin.class, FakeTestDevicePlugin1.class);
}
// Fail to load a class which doesn't implement interface DevicePlugin
@Test(timeout = 30000)
- public void testLoadInvalidPluggableDeviceClasses()
- throws Exception{
+ public void testLoadInvalidPluggableDeviceClasses() {
ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm);
@@ -435,8 +426,7 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
// Fail to register duplicated resource name.
@Test(timeout = 30000)
- public void testLoadDuplicateResourceNameDevicePlugin()
- throws Exception{
+ public void testLoadDuplicateResourceNameDevicePlugin() {
ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm);
@@ -469,8 +459,7 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
* It doesn't implement the "getRegisterRequestInfo"
*/
@Test(timeout = 30000)
- public void testIncompatibleDevicePlugin()
- throws Exception {
+ public void testIncompatibleDevicePlugin() {
ResourcePluginManager rpm = new ResourcePluginManager();
ResourcePluginManager rpmSpy = spy(rpm);
@@ -515,16 +504,15 @@ public class TestResourcePluginManager extends
NodeManagerTestBase {
nm.init(conf);
nm.start();
// only 1 plugin has the customized scheduler
- verify(rpmSpy, times(1)).checkInterfaceCompatibility(
+ verify(rpmSpy).checkInterfaceCompatibility(
DevicePlugin.class, FakeTestDevicePlugin1.class);
- verify(dmmSpy, times(1)).addDevicePluginScheduler(
+ verify(dmmSpy).addDevicePluginScheduler(
any(String.class), any(DevicePluginScheduler.class));
Assert.assertEquals(1, dmm.getDevicePluginSchedulers().size());
}
@Test(timeout = 30000)
- public void testRequestedResourceNameIsConfigured()
- throws Exception{
+ public void testRequestedResourceNameIsConfigured() {
ResourcePluginManager rpm = new ResourcePluginManager();
String resourceName = "a.com/a";
Assert.assertFalse(rpm.isConfiguredResourceName(resourceName));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]