Repository: hive Updated Branches: refs/heads/master 667e9dd50 -> e2bd513a3
HIVE-12222: Define port range in property for RPCServer (Aihua Xu, reviewed by Xuefu Zhang) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e2bd513a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e2bd513a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e2bd513a Branch: refs/heads/master Commit: e2bd513a3970b141576f7ead25fc6cfcc5fcda17 Parents: 667e9dd Author: Aihua Xu <aihu...@apache.org> Authored: Thu Sep 22 14:20:51 2016 -0400 Committer: Aihua Xu <aihu...@apache.org> Committed: Wed Sep 28 12:07:40 2016 -0400 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 ++ .../hive/spark/client/rpc/RpcConfiguration.java | 38 +++++++++++++++++ .../apache/hive/spark/client/rpc/RpcServer.java | 44 +++++++++++++++++--- .../apache/hive/spark/client/rpc/TestRpc.java | 37 +++++++++++++++- 4 files changed, 115 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e2bd513a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 43a16d7..4c3ef3e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3080,6 +3080,9 @@ public class HiveConf extends Configuration { "Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." + "This is only necessary if the host has mutiple network addresses and if a different network address other than " + "hive.server2.thrift.bind.host is to be used."), + SPARK_RPC_SERVER_PORT("hive.spark.client.rpc.server.port", "", "A list of port ranges which can be used by RPC server " + + "with the format of 49152-49222,49228 and a random one is selected from the list. Default is empty, which randomly " + + "selects one port from all available ones."), SPARK_DYNAMIC_PARTITION_PRUNING( "hive.spark.dynamic.partition.pruning", false, "When dynamic pruning is enabled, joins on partition keys will be processed by writing\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/e2bd513a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index 210f8a4..8c59015 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -18,7 +18,9 @@ package org.apache.hive.spark.client.rpc; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -107,6 +109,42 @@ public final class RpcConfiguration { return ServerUtils.getHostAddress(hiveHost).getHostName(); } + /** + * Parses the port string like 49152-49222,49228 into the port list. A default 0 + * is added for the empty port string. + * @return a list of configured ports. + * @exception IOException is thrown if the property is not configured properly + */ + List<Integer> getServerPorts() throws IOException { + String errMsg = "Incorrect RPC server port configuration for HiveServer2"; + String portString = config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname); + ArrayList<Integer> ports = new ArrayList<Integer>(); + try { + if(!StringUtils.isEmpty(portString)) { + for (String portRange : portString.split(",")) { + String[] range = portRange.split("-"); + if (range.length == 0 || range.length > 2 + || (range.length == 2 && Integer.valueOf(range[0]) > Integer.valueOf(range[1]))) { + throw new IOException(errMsg); + } + if (range.length == 1) { + ports.add(Integer.valueOf(range[0])); + } else { + for (int i = Integer.valueOf(range[0]); i <= Integer.valueOf(range[1]); i++) { + ports.add(i); + } + } + } + } else { + ports.add(0); + } + + return ports; + } catch(NumberFormatException e) { + throw new IOException(errMsg); + } + } + String getRpcChannelLogLevel() { return config.get(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname); } http://git-wip-us.apache.org/repos/asf/hive/blob/e2bd513a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java index 68ee627..657494a 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java @@ -21,10 +21,13 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.security.SecureRandom; +import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; + import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.NameCallback; @@ -39,8 +42,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -51,9 +56,9 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ScheduledFuture; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hadoop.hive.common.classification.InterfaceAudience; /** @@ -82,7 +87,7 @@ public class RpcServer implements Closeable { .setNameFormat("RPC-Handler-%d") .setDaemon(true) .build()); - this.channel = new ServerBootstrap() + ServerBootstrap serverBootstrap = new ServerBootstrap() .group(group) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @@ -107,16 +112,43 @@ public class RpcServer implements Closeable { }) .option(ChannelOption.SO_BACKLOG, 1) .option(ChannelOption.SO_REUSEADDR, true) - .childOption(ChannelOption.SO_KEEPALIVE, true) - .bind(0) - .sync() - .channel(); + .childOption(ChannelOption.SO_KEEPALIVE, true); + + this.channel = bindServerPort(serverBootstrap).channel(); this.port = ((InetSocketAddress) channel.localAddress()).getPort(); this.pendingClients = Maps.newConcurrentMap(); this.address = this.config.getServerAddress(); } /** + * Retry the list of configured ports until one is found + * @param serverBootstrap + * @return + * @throws InterruptedException + * @throws IOException + */ + private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap) + throws InterruptedException, IOException { + List<Integer> ports = config.getServerPorts(); + if (ports.contains(0)) { + return serverBootstrap.bind(0).sync(); + } else { + Random rand = new Random(); + while(!ports.isEmpty()) { + int index = rand.nextInt(ports.size()); + int port = ports.get(index); + ports.remove(index); + try { + return serverBootstrap.bind(port).sync(); + } catch(Exception e) { + // Retry the next port + } + } + throw new IOException("No available ports from configured RPC Server ports for HiveServer2"); + } + } + + /** * Tells the RPC server to expect a connection from a new client. * * @param clientId An identifier for the client. Must be unique. http://git-wip-us.apache.org/repos/asf/hive/blob/e2bd513a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java ---------------------------------------------------------------------- diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java index 7bcf1df..77c3d02 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java @@ -19,10 +19,10 @@ package org.apache.hive.spark.client.rpc; import java.io.Closeable; import java.net.InetAddress; +import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -168,6 +168,41 @@ public class TestRpc { } @Test + public void testServerPort() throws Exception { + Map<String, String> config = new HashMap<String, String>(); + + RpcServer server0 = new RpcServer(config); + assertTrue("Empty port range should return a random valid port: " + server0.getPort(), server0.getPort() >= 0); + IOUtils.closeQuietly(server0); + + config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, "49152-49222,49223,49224-49333"); + RpcServer server1 = new RpcServer(config); + assertTrue("Port should be within configured port range:" + server1.getPort(), server1.getPort() >= 49152 && server1.getPort() <= 49333); + IOUtils.closeQuietly(server1); + + int expectedPort = 65535; + config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, String.valueOf(expectedPort)); + RpcServer server2 = new RpcServer(config); + assertTrue("Port should match configured one: " + server2.getPort(), server2.getPort() == expectedPort); + IOUtils.closeQuietly(server2); + + config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, "49552-49222,49223,49224-49333"); + try { + autoClose(new RpcServer(config)); + assertTrue("Invalid port range should throw an exception", false); // Should not reach here + } catch(IOException e) { + assertEquals("Incorrect RPC server port configuration for HiveServer2", e.getMessage()); + } + + // Retry logic + expectedPort = 65535; + config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, String.valueOf(expectedPort) + ",21-23"); + RpcServer server3 = new RpcServer(config); + assertTrue("Port should match configured one:" + server3.getPort(), server3.getPort() == expectedPort); + IOUtils.closeQuietly(server3); + } + + @Test public void testCloseListener() throws Exception { RpcServer server = autoClose(new RpcServer(emptyConfig)); Rpc[] rpcs = createRpcConnection(server);