Repository: hive
Updated Branches:
  refs/heads/master 667e9dd50 -> e2bd513a3


HIVE-12222: Define port range in property for RPCServer (Aihua Xu, reviewed by 
Xuefu Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e2bd513a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e2bd513a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e2bd513a

Branch: refs/heads/master
Commit: e2bd513a3970b141576f7ead25fc6cfcc5fcda17
Parents: 667e9dd
Author: Aihua Xu <aihu...@apache.org>
Authored: Thu Sep 22 14:20:51 2016 -0400
Committer: Aihua Xu <aihu...@apache.org>
Committed: Wed Sep 28 12:07:40 2016 -0400

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  3 ++
 .../hive/spark/client/rpc/RpcConfiguration.java | 38 +++++++++++++++++
 .../apache/hive/spark/client/rpc/RpcServer.java | 44 +++++++++++++++++---
 .../apache/hive/spark/client/rpc/TestRpc.java   | 37 +++++++++++++++-
 4 files changed, 115 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e2bd513a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 43a16d7..4c3ef3e 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3080,6 +3080,9 @@ public class HiveConf extends Configuration {
       "Default is empty, which means the address will be determined in the 
same way as for hive.server2.thrift.bind.host." +
       "This is only necessary if the host has mutiple network addresses and if 
a different network address other than " +
       "hive.server2.thrift.bind.host is to be used."),
+    SPARK_RPC_SERVER_PORT("hive.spark.client.rpc.server.port", "", "A list of 
port ranges which can be used by RPC server " +
+        "with the format of 49152-49222,49228 and a random one is selected 
from the list. Default is empty, which randomly " +
+        "selects one port from all available ones."),
     SPARK_DYNAMIC_PARTITION_PRUNING(
         "hive.spark.dynamic.partition.pruning", false,
         "When dynamic pruning is enabled, joins on partition keys will be 
processed by writing\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/e2bd513a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
 
b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
index 210f8a4..8c59015 100644
--- 
a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
+++ 
b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java
@@ -18,7 +18,9 @@
 package org.apache.hive.spark.client.rpc;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -107,6 +109,42 @@ public final class RpcConfiguration {
     return ServerUtils.getHostAddress(hiveHost).getHostName();
   }
 
+  /**
+   * Parses the port string like 49152-49222,49228 into the port list. A 
default 0
+   * is added for the empty port string.
+   * @return a list of configured ports.
+   * @exception IOException is thrown if the property is not configured 
properly
+   */
+  List<Integer> getServerPorts() throws IOException {
+    String errMsg = "Incorrect RPC server port configuration for HiveServer2";
+    String portString = 
config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname);
+    ArrayList<Integer> ports = new ArrayList<Integer>();
+    try {
+      if(!StringUtils.isEmpty(portString)) {
+        for (String portRange : portString.split(",")) {
+          String[] range = portRange.split("-");
+          if (range.length == 0 || range.length > 2
+              || (range.length == 2 && Integer.valueOf(range[0]) > 
Integer.valueOf(range[1]))) {
+            throw new IOException(errMsg);
+          }
+          if (range.length == 1) {
+            ports.add(Integer.valueOf(range[0]));
+          } else {
+            for (int i = Integer.valueOf(range[0]); i <= 
Integer.valueOf(range[1]); i++) {
+              ports.add(i);
+            }
+          }
+        }
+      } else {
+        ports.add(0);
+      }
+
+      return ports;
+    } catch(NumberFormatException e) {
+      throw new IOException(errMsg);
+    }
+  }
+
   String getRpcChannelLogLevel() {
     return config.get(HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e2bd513a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java 
b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
index 68ee627..657494a 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
@@ -21,10 +21,13 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.SecureRandom;
+import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
+
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
@@ -39,8 +42,10 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -51,9 +56,9 @@ import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.Promise;
 import io.netty.util.concurrent.ScheduledFuture;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 
 /**
@@ -82,7 +87,7 @@ public class RpcServer implements Closeable {
             .setNameFormat("RPC-Handler-%d")
             .setDaemon(true)
             .build());
-    this.channel = new ServerBootstrap()
+     ServerBootstrap serverBootstrap = new ServerBootstrap()
       .group(group)
       .channel(NioServerSocketChannel.class)
       .childHandler(new ChannelInitializer<SocketChannel>() {
@@ -107,16 +112,43 @@ public class RpcServer implements Closeable {
       })
       .option(ChannelOption.SO_BACKLOG, 1)
       .option(ChannelOption.SO_REUSEADDR, true)
-      .childOption(ChannelOption.SO_KEEPALIVE, true)
-      .bind(0)
-      .sync()
-      .channel();
+      .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+    this.channel = bindServerPort(serverBootstrap).channel();
     this.port = ((InetSocketAddress) channel.localAddress()).getPort();
     this.pendingClients = Maps.newConcurrentMap();
     this.address = this.config.getServerAddress();
   }
 
   /**
+   * Retry the list of configured ports until one is found
+   * @param serverBootstrap
+   * @return
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  private ChannelFuture bindServerPort(ServerBootstrap serverBootstrap)
+      throws InterruptedException, IOException {
+    List<Integer> ports = config.getServerPorts();
+    if (ports.contains(0)) {
+      return serverBootstrap.bind(0).sync();
+    } else {
+      Random rand = new Random();
+      while(!ports.isEmpty()) {
+        int index = rand.nextInt(ports.size());
+        int port = ports.get(index);
+        ports.remove(index);
+        try {
+          return serverBootstrap.bind(port).sync();
+        } catch(Exception e) {
+          // Retry the next port
+        }
+      }
+      throw new IOException("No available ports from configured RPC Server 
ports for HiveServer2");
+    }
+  }
+
+  /**
    * Tells the RPC server to expect a connection from a new client.
    *
    * @param clientId An identifier for the client. Must be unique.

http://git-wip-us.apache.org/repos/asf/hive/blob/e2bd513a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
----------------------------------------------------------------------
diff --git 
a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java 
b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
index 7bcf1df..77c3d02 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/rpc/TestRpc.java
@@ -19,10 +19,10 @@ package org.apache.hive.spark.client.rpc;
 
 import java.io.Closeable;
 import java.net.InetAddress;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -168,6 +168,41 @@ public class TestRpc {
   }
 
   @Test
+  public void testServerPort() throws Exception {
+    Map<String, String> config = new HashMap<String, String>();
+
+    RpcServer server0 = new RpcServer(config);
+    assertTrue("Empty port range should return a random valid port: " + 
server0.getPort(), server0.getPort() >= 0);
+    IOUtils.closeQuietly(server0);
+
+    config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, 
"49152-49222,49223,49224-49333");
+    RpcServer server1 = new RpcServer(config);
+    assertTrue("Port should be within configured port range:" + 
server1.getPort(), server1.getPort() >= 49152 && server1.getPort() <= 49333);
+    IOUtils.closeQuietly(server1);
+
+    int expectedPort = 65535;
+    config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, 
String.valueOf(expectedPort));
+    RpcServer server2 = new RpcServer(config);
+    assertTrue("Port should match configured one: " + server2.getPort(), 
server2.getPort() == expectedPort);
+    IOUtils.closeQuietly(server2);
+
+    config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, 
"49552-49222,49223,49224-49333");
+    try {
+      autoClose(new RpcServer(config));
+      assertTrue("Invalid port range should throw an exception", false); // 
Should not reach here
+    } catch(IOException e) {
+      assertEquals("Incorrect RPC server port configuration for HiveServer2", 
e.getMessage());
+    }
+
+    // Retry logic
+    expectedPort = 65535;
+    config.put(HiveConf.ConfVars.SPARK_RPC_SERVER_PORT.varname, 
String.valueOf(expectedPort) + ",21-23");
+    RpcServer server3 = new RpcServer(config);
+    assertTrue("Port should match configured one:" + server3.getPort(), 
server3.getPort() == expectedPort);
+    IOUtils.closeQuietly(server3);
+  }
+
+  @Test
   public void testCloseListener() throws Exception {
     RpcServer server = autoClose(new RpcServer(emptyConfig));
     Rpc[] rpcs = createRpcConnection(server);

Reply via email to