Repository: incubator-slider Updated Branches: refs/heads/develop e13d20e3a -> 6be4bfadc
SLIDER-505 enable port range specification for AM Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/6be4bfad Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/6be4bfad Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/6be4bfad Branch: refs/heads/develop Commit: 6be4bfadcec39f6912861277d74e0d8e0a46e603 Parents: e13d20e Author: Jon Maron <[email protected]> Authored: Wed Oct 22 15:09:54 2014 -0400 Committer: Jon Maron <[email protected]> Committed: Wed Oct 22 15:09:54 2014 -0400 ---------------------------------------------------------------------- .../org/apache/slider/common/SliderKeys.java | 2 + .../apache/slider/common/tools/PortScanner.java | 90 +++++++++++++++++++ .../server/appmaster/SliderAppMaster.java | 30 ++++++- .../slider/common/tools/TestPortScan.groovy | 91 +++++++++++++++++++- 4 files changed, 208 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6be4bfad/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java index e75ec73..ae58ef3 100644 --- a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java +++ b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java @@ -182,4 +182,6 @@ public interface SliderKeys extends SliderXmlConfKeys { */ String AM_FILTER_NAME = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"; + + String KEY_AM_ALLOWED_PORT_RANGE = "slider.am.allowed.port.range"; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6be4bfad/slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java b/slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java new file mode 100644 index 0000000..0f4cfbc --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/common/tools/PortScanner.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.slider.common.tools; + +import org.apache.slider.common.SliderExitCodes; +import org.apache.slider.core.exceptions.SliderException; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * + */ +public class PortScanner { + private static Pattern NUMBER_RANGE = Pattern.compile("^(\\d+)\\s*-\\s*(\\d+)$"); + private static Pattern SINGLE_NUMBER = Pattern.compile("^\\d+$"); + + private List<Integer> remainingPortsToCheck; + + public PortScanner() { + } + + public void setPortRange(String input) { + // first split based on commas + Set<Integer> inputPorts= new TreeSet<Integer>(); + String[] ranges = input.split(","); + for ( String range : ranges ) { + Matcher m = SINGLE_NUMBER.matcher(range.trim()); + if (m.find()) { + inputPorts.add(Integer.parseInt(m.group())); + } else { + m = NUMBER_RANGE.matcher(range.trim()); + if (m.find()) { + String[] boundaryValues = m.group(0).split("-"); + int start = Integer.parseInt(boundaryValues[0].trim()); + int end = Integer.parseInt(boundaryValues[1].trim()); + for (int i = start; i < end + 1; i++) { + inputPorts.add(i); + } + } + } + } + this.remainingPortsToCheck = new ArrayList<Integer>(inputPorts); + } + + public List<Integer> getRemainingPortsToCheck() { + return remainingPortsToCheck; + } + + public int getAvailablePort () throws SliderException{ + boolean found = false; + int availablePort = -1; + Iterator<Integer> portsToCheck = this.remainingPortsToCheck.iterator(); + while (portsToCheck.hasNext() && !found) { + int portToCheck = portsToCheck.next(); + found = SliderUtils.isPortAvailable(portToCheck); + if (found) { + availablePort = portToCheck; + portsToCheck.remove(); + } + } + + if (availablePort < 0) { + throw new SliderException(SliderExitCodes.EXIT_BAD_CONFIGURATION, + "No available ports found in configured range {}", + remainingPortsToCheck); + } + + return availablePort; + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6be4bfad/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index d696f45..e7fa109 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -88,6 +88,7 @@ import org.apache.slider.common.params.SliderAMArgs; import org.apache.slider.common.params.SliderAMCreateAction; import org.apache.slider.common.params.SliderActions; import org.apache.slider.common.tools.ConfigHelper; +import org.apache.slider.common.tools.PortScanner; import org.apache.slider.common.tools.SliderFileSystem; import org.apache.slider.common.tools.SliderUtils; import org.apache.slider.common.tools.SliderVersionInfo; @@ -375,6 +376,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService private YarnRegistryViewForProviders yarnRegistryOperations; private FsDelegationTokenManager fsDelegationTokenManager; private RegisterApplicationMasterResponse amRegistrationData; + private PortScanner portScanner; /** * Service Constructor @@ -656,7 +658,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService secretManager = new ClientToAMTokenSecretManager(appAttemptID, null); //bring up the Slider RPC service - startSliderRPCServer(); + startSliderRPCServer(instanceDefinition); rpcServiceAddress = rpcService.getConnectAddress(); appMasterHostname = rpcServiceAddress.getHostName(); @@ -690,6 +692,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService startAgentWebApp(appInformation, serviceConf); + int port = getPortToRequest(instanceDefinition); + webApp = new SliderAMWebApp(registryOperations); WebApps.$for(SliderAMWebApp.BASE_PATH, WebAppApi.class, new WebAppApiImpl(this, @@ -698,6 +702,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService certificateManager, registryOperations), RestPaths.WS_CONTEXT) .withHttpPolicy(serviceConf, HttpConfig.Policy.HTTP_ONLY) + .at(port) .start(webApp); String scheme = WebAppUtils.HTTP_PREFIX; appMasterTrackingUrl = scheme + appMasterHostname + ":" + webApp.port(); @@ -900,6 +905,23 @@ public class SliderAppMaster extends AbstractSliderLaunchedService return finish(); } + private int getPortToRequest(AggregateConf instanceDefinition) + throws SliderException { + int portToRequest = 0; + String portRange = instanceDefinition. + getAppConfOperations().getComponent(SliderKeys.COMPONENT_AM) + .getOption(SliderKeys.KEY_AM_ALLOWED_PORT_RANGE , "0"); + if (!"0".equals(portRange)) { + if (portScanner == null) { + portScanner = new PortScanner(); + portScanner.setPortRange(portRange); + } + portToRequest = portScanner.getAvailablePort(); + } + + return portToRequest; + } + private void uploadServerCertForLocalization(String clustername, SliderFileSystem fs) throws IOException { @@ -1334,7 +1356,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService /** * Start the slider RPC server */ - private void startSliderRPCServer() throws IOException, BadConfigException { + private void startSliderRPCServer(AggregateConf instanceDefinition) + throws IOException, SliderException { // verify that if the cluster is authed, the ACLs are set. boolean authorization = getConfig().getBoolean( @@ -1352,9 +1375,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService .newReflectiveBlockingService( protobufRelay); + int port = getPortToRequest(instanceDefinition); rpcService = new WorkflowRpcService("SliderRPC", RpcBinder.createProtobufServer( - new InetSocketAddress("0.0.0.0", 0), + new InetSocketAddress("0.0.0.0", port), getConfig(), secretManager, NUM_RPC_HANDLERS, http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/6be4bfad/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy b/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy index 49bd58e..f009e25 100644 --- a/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/common/tools/TestPortScan.groovy @@ -18,11 +18,11 @@ package org.apache.slider.common.tools -import groovy.transform.CompileStatic +import org.apache.slider.core.exceptions.SliderException import org.junit.Test -@CompileStatic class TestPortScan { + final shouldFail = new GroovyTestCase().&shouldFail @Test public void testScanPorts() throws Throwable { @@ -38,4 +38,91 @@ class TestPortScan { server.close() } } + + @Test + public void testRequestedPortsLogic() throws Throwable { + PortScanner portScanner = new PortScanner() + portScanner.setPortRange("5,6,8-10, 11,14 ,20 - 22") + List<Integer> ports = portScanner.remainingPortsToCheck + def expectedPorts = [5,6,8,9,10,11,14,20,21,22] + assert ports == expectedPorts + } + + @Test + public void testRequestedPortsOutOfOrder() throws Throwable { + PortScanner portScanner = new PortScanner() + portScanner.setPortRange("8-10,5,6, 11,20 - 22, 14 ") + List<Integer> ports = portScanner.remainingPortsToCheck + def expectedPorts = [5,6,8,9,10,11,14,20,21,22] + assert ports == expectedPorts + } + + @Test + public void testFindAvailablePortInRange() throws Throwable { + ServerSocket server = new ServerSocket(0) + try { + int serverPort = server.getLocalPort() + + PortScanner portScanner = new PortScanner() + portScanner.setPortRange("" + (serverPort-1) + "-" + (serverPort + 3)) + int port = portScanner.availablePort + assert port != serverPort + assert port >= serverPort -1 && port <= serverPort + 3 + } finally { + server.close() + } + } + + @Test + public void testFindAvailablePortInList() throws Throwable { + ServerSocket server = new ServerSocket(0) + try { + int serverPort = server.getLocalPort() + + PortScanner portScanner = new PortScanner() + portScanner.setPortRange("" + (serverPort-1) + ", " + (serverPort + 1)) + int port = portScanner.availablePort + assert port != serverPort + assert port == serverPort -1 || port == serverPort + 1 + } finally { + server.close() + } + } + + @Test + public void testNoAvailablePorts() throws Throwable { + ServerSocket server1 = new ServerSocket(0) + ServerSocket server2 = new ServerSocket(0) + try { + int serverPort1 = server1.getLocalPort() + int serverPort2 = server2.getLocalPort() + + PortScanner portScanner = new PortScanner() + portScanner.setPortRange("" + serverPort1+ ", " + serverPort2) + shouldFail(SliderException) { + portScanner.availablePort + } + } finally { + server1.close() + server2.close() + } + } + + @Test + public void testPortRemovedFromRange() throws Throwable { + ServerSocket server = new ServerSocket(0) + try { + int serverPort = server.getLocalPort() + + PortScanner portScanner = new PortScanner() + portScanner.setPortRange("" + (serverPort-1) + "-" + (serverPort + 3)) + int port = portScanner.availablePort + assert port != serverPort + assert port >= serverPort -1 && port <= serverPort + 3 + def isPortInList = port in portScanner.remainingPortsToCheck + assert !isPortInList + } finally { + server.close() + } + } }
