Repository: incubator-slider Updated Branches: refs/heads/develop fc13c15c1 -> 971918661
SLIDER-558 enable port ranges for all containers Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/97191866 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/97191866 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/97191866 Branch: refs/heads/develop Commit: 971918661fb0ce973b4efb488323ca85c24f57a1 Parents: fc13c15 Author: Jon Maron <[email protected]> Authored: Sat Oct 25 17:21:01 2014 -0400 Committer: Jon Maron <[email protected]> Committed: Sat Oct 25 17:21:01 2014 -0400 ---------------------------------------------------------------------- .../python/agent/CustomServiceOrchestrator.py | 68 ++++++++++++++++---- .../agent/TestCustomServiceOrchestrator.py | 28 ++++++++ .../org/apache/slider/common/SliderKeys.java | 2 +- .../server/appmaster/SliderAppMaster.java | 4 +- .../standalone/TestStandaloneAgentAM.groovy | 47 ++++++++++++++ 5 files changed, 135 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/97191866/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py index 3932287..119c926 100644 --- a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py +++ b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py @@ -22,6 +22,7 @@ import logging import os import json import pprint +import random import sys import socket import posixpath @@ -33,6 +34,7 @@ from PythonExecutor import PythonExecutor import hostname import Constants +MAX_ATTEMPTS = 5 logger = logging.getLogger() @@ -252,13 +254,13 @@ class CustomServiceOrchestrator(): Its of the form {component_name.ALLOCATED_PORT}[{DEFAULT_default_port}][{PER_CONTAINER}] Either a port gets allocated or if not then just set the value to "0" """ - def finalize_command(self, command, store_command, allocated_ports): component = command['componentName'] allocated_for_this_component_format = "${{{0}.ALLOCATED_PORT}}" allocated_for_any = ".ALLOCATED_PORT}" port_allocation_req = allocated_for_this_component_format.format(component) + allowed_ports = self.get_allowed_ports(command) if 'configurations' in command: for key in command['configurations']: if len(command['configurations'][key]) > 0: @@ -269,7 +271,7 @@ class CustomServiceOrchestrator(): value = value.replace("${AGENT_LOG_ROOT}", self.config.getLogPath()) if port_allocation_req in value: - value = self.allocate_ports(value, port_allocation_req) + value = self.allocate_ports(value, port_allocation_req, allowed_ports) allocated_ports[key + "." + k] = value elif allocated_for_any in value: ## All unallocated ports should be set to 0 @@ -323,7 +325,7 @@ class CustomServiceOrchestrator(): append {DEFAULT_ and find the default value append {PER_CONTAINER} if it exists """ - def allocate_ports(self, value, port_req_pattern): + def allocate_ports(self, value, port_req_pattern, allowed_ports=None): default_port_pattern = "{DEFAULT_" do_not_propagate_pattern = "{PER_CONTAINER}" index = value.find(port_req_pattern) @@ -345,7 +347,7 @@ class CustomServiceOrchestrator(): if index == value.find(replaced_pattern + do_not_propagate_pattern): replaced_pattern = replaced_pattern + do_not_propagate_pattern pass - port = self.allocate_port(def_port) + port = self.allocate_port(def_port, allowed_ports) value = value.replace(replaced_pattern, str(port), 1) logger.info("Allocated port " + str(port) + " for " + replaced_pattern) index = value.find(port_req_pattern) @@ -354,24 +356,28 @@ class CustomServiceOrchestrator(): pass - def allocate_port(self, default_port=None): + def allocate_port(self, default_port=None, allowed_ports=None): if default_port != None: if self.is_port_available(default_port): return default_port - MAX_ATTEMPT = 5 - iter = 0 + port_list = [0] * MAX_ATTEMPTS + if allowed_ports != None: + port_list = allowed_ports + + i = 0 port = -1 - while iter < MAX_ATTEMPT: - iter = iter + 1 + itor = iter(port_list) + while i < min(len(port_list), MAX_ATTEMPTS): try: sock = socket.socket() - sock.bind(('', 0)) + sock.bind(('', itor.next())) port = sock.getsockname()[1] except Exception, err: - logger.info("Encountered error while trying to opening socket - " + str(err)) + logger.info("Encountered error while trying to open socket - " + str(err)) finally: sock.close() + i = i + 1 pass logger.info("Allocated dynamic port: " + str(port)) return port @@ -387,3 +393,43 @@ class CustomServiceOrchestrator(): return False + def get_allowed_ports(self, command): + allowed_ports = None + global_config = command['configurations'].get('global') + if global_config != None: + allowed_ports_value = global_config.get("slider.allowed.ports") + if allowed_ports_value: + allowed_ports = self.get_allowed_port_list(allowed_ports_value) + + return allowed_ports + + + def get_allowed_port_list(self, allowedPortsOptionValue, + num_values=MAX_ATTEMPTS): + selection = set() + invalid = set() + # tokens are comma seperated values + tokens = [x.strip() for x in allowedPortsOptionValue.split(',')] + for i in tokens: + try: + selection.add(int(i)) + except: + # should be a range + try: + token = [int(k.strip()) for k in i.split('-')] + if len(token) > 1: + token.sort() + first = token[0] + last = token[len(token)-1] + for x in range(first, last+1): + selection.add(x) + except: + # not an int and not a range... + invalid.add(i) + selection = random.sample(selection, min (len(selection), num_values)) + # Report invalid tokens before returning valid selection + logger.info("Allowed port values: " + str(selection)) + logger.warning("Invalid port range values: " + str(invalid)) + return selection + + http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/97191866/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py index 6ada7fa..a4cef94 100644 --- a/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py +++ b/slider-agent/src/test/python/agent/TestCustomServiceOrchestrator.py @@ -544,6 +544,34 @@ class TestCustomServiceOrchestrator(TestCase): self.assertFalse(port == -1) self.assertTrue(port > 0) + + def test_parse_allowed_port_values(self): + dummy_controller = MagicMock() + tempdir = tempfile.gettempdir() + tempWorkDir = tempdir + "W" + config = MagicMock() + config.get.return_value = "something" + config.getResolvedPath.return_value = tempdir + config.getWorkRootPath.return_value = tempWorkDir + config.getLogPath.return_value = tempdir + + orchestrator = CustomServiceOrchestrator(config, dummy_controller) + port_range = "48000-48005" + port_range_full_list = [48000, 48001, 48002, 48003, 48004, 48005] + allowed_ports = orchestrator.get_allowed_port_list(port_range, 3) + self.assertTrue(set(allowed_ports).issubset(port_range_full_list)) + + port_range = "48000 , 48005" + port_range_full_list = [48000, 48005] + allowed_ports = orchestrator.get_allowed_port_list(port_range, 1) + self.assertTrue(set(allowed_ports).issubset(port_range_full_list)) + + port_range = "48000 , 48004-48005" + port_range_full_list = [48000, 48004, 48005] + allowed_ports = orchestrator.get_allowed_port_list(port_range, 2) + self.assertTrue(set(allowed_ports).issubset(port_range_full_list)) + + def tearDown(self): # enable stdout sys.stdout = sys.__stdout__ http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/97191866/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java index 048dfa7..5f16e56 100644 --- a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java +++ b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java @@ -189,5 +189,5 @@ public interface SliderKeys extends SliderXmlConfKeys { String AM_FILTER_NAME = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"; - String KEY_AM_ALLOWED_PORT_RANGE = "slider.am.allowed.port.range"; + String KEY_ALLOWED_PORT_RANGE = "site.global.slider.allowed.ports"; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/97191866/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 7a1711f..bb198a1 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -917,8 +917,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService throws SliderException { int portToRequest = 0; String portRange = instanceDefinition. - getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM) - .getOption(SliderKeys.KEY_AM_ALLOWED_PORT_RANGE , "0"); + getAppConfOperations().getGlobalOptions(). + getOption(SliderKeys.KEY_ALLOWED_PORT_RANGE, "0"); if (!"0".equals(portRange)) { if (portScanner == null) { portScanner = new PortScanner(); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/97191866/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAgentAM.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAgentAM.groovy b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAgentAM.groovy index 60b4dd8..f04583e 100644 --- a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAgentAM.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAgentAM.groovy @@ -20,17 +20,23 @@ package org.apache.slider.agent.standalone import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import org.apache.hadoop.fs.Path import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.hadoop.yarn.api.records.ApplicationReport import org.apache.hadoop.yarn.api.records.YarnApplicationState +import org.apache.hadoop.yarn.exceptions.YarnException import org.apache.slider.agent.AgentMiniClusterTestBase import org.apache.slider.api.ClusterNode import org.apache.slider.client.SliderClient import org.apache.slider.common.SliderKeys import org.apache.slider.common.params.ActionRegistryArgs +import org.apache.slider.core.build.InstanceBuilder +import org.apache.slider.core.conf.AggregateConf import org.apache.slider.core.exceptions.SliderException +import org.apache.slider.core.launch.LaunchedApplication import org.apache.slider.core.main.LauncherExitCodes import org.apache.slider.core.main.ServiceLauncher +import org.apache.slider.core.persist.LockAcquireFailedException import org.junit.Test @CompileStatic @@ -159,8 +165,49 @@ class TestStandaloneAgentAM extends AgentMiniClusterTestBase { clustername) assert instance3.yarnApplicationState >= YarnApplicationState.FINISHED + //create another AM, this time with a port range + setSliderClientClassName(TestSliderClient.name) + try { + launcher = createStandaloneAM(clustername, true, true) + client = launcher.service + i2AppID = client.applicationId + + reportFor = client.getApplicationReport(i2AppID) + URI uri = new URI(reportFor.originalTrackingUrl) + assert uri.port in [60000, 60001, 60002, 60003] + assert reportFor.rpcPort in [60000, 60001, 60002, 60003] + assert 0 == clusterActionFreeze(client, clustername) + + } finally { + setSliderClientClassName(SliderClient.name) + } } + static class TestSliderClient extends SliderClient { + @Override + protected void persistInstanceDefinition(boolean overwrite, + Path appconfdir, + InstanceBuilder builder) + throws IOException, SliderException, LockAcquireFailedException { + AggregateConf conf = builder.getInstanceDescription() + conf.getAppConfOperations().getGlobalOptions().put( + SliderKeys.KEY_ALLOWED_PORT_RANGE, + "60000-60003") + super.persistInstanceDefinition(overwrite, appconfdir, builder) + } + + @Override + LaunchedApplication launchApplication(String clustername, + Path clusterDirectory, + AggregateConf instanceDefinition, + boolean debugAM) + throws YarnException, IOException { + instanceDefinition.getAppConfOperations().getGlobalOptions().put( + SliderKeys.KEY_ALLOWED_PORT_RANGE, + "60000-60003") + return super.launchApplication(clustername, clusterDirectory, instanceDefinition, debugAM) + } + } }
