HIVE-12568: Provide an option to specify network interface used by Spark remote client [Spark Branch] (reviewed by Jimmy)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9f57569b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9f57569b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9f57569b Branch: refs/heads/master Commit: 9f57569b0f648bb5596df60e0a62db06930778ea Parents: 72e070f Author: xzhang <xzhang@xzdt> Authored: Mon Dec 7 11:10:25 2015 -0800 Committer: Rui Li <[email protected]> Committed: Thu Jan 28 14:50:07 2016 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/ServerUtils.java | 19 +++++++ .../org/apache/hadoop/hive/conf/HiveConf.java | 5 ++ .../service/cli/thrift/ThriftCLIService.java | 15 +++--- .../hive/spark/client/rpc/RpcConfiguration.java | 57 +++++++------------- 4 files changed, 50 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9f57569b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java index 83517ce..b44f92f 100644 --- a/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/ServerUtils.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.common; +import java.net.InetAddress; +import java.net.UnknownHostException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileSystem; @@ -47,4 +50,20 @@ public class ServerUtils { } } + /** + * Get the Inet address of the machine of the given host name. + * @param hostname The name of the host + * @return The network address of the the host + * @throws UnknownHostException + */ + public static InetAddress getHostAddress(String hostname) throws UnknownHostException { + InetAddress serverIPAddress; + if (hostname != null && !hostname.isEmpty()) { + serverIPAddress = InetAddress.getByName(hostname); + } else { + serverIPAddress = InetAddress.getLocalHost(); + } + return serverIPAddress; + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/9f57569b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3e10fd4..c4034a5 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2601,6 +2601,11 @@ public class HiveConf extends Configuration { "Channel logging level for remote Spark driver. One of {DEBUG, ERROR, INFO, TRACE, WARN}."), SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5", "Name of the SASL mechanism to use for authentication."), + SPARK_RPC_SERVER_ADDRESS("hive.spark.client.rpc.server.address", "", + "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " + + "Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." + + "This is only necessary if the host has mutiple network addresses and if a different network address other than " + + "hive.server2.thrift.bind.host is to be used."), SPARK_DYNAMIC_PARTITION_PRUNING( "hive.spark.dynamic.partition.pruning", false, "When dynamic pruning is enabled, joins on partition keys will be processed by writing\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/9f57569b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 78b4b31..5c8fa3c 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hive.service.AbstractService; import org.apache.hive.service.ServiceException; import org.apache.hive.service.ServiceUtils; @@ -203,21 +204,19 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe @Override public synchronized void init(HiveConf hiveConf) { this.hiveConf = hiveConf; - // Initialize common server configs needed in both binary & http modes - String portString; - hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST"); + + String hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST"); if (hiveHost == null) { hiveHost = hiveConf.getVar(ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); } try { - if (hiveHost != null && !hiveHost.isEmpty()) { - serverIPAddress = InetAddress.getByName(hiveHost); - } else { - serverIPAddress = InetAddress.getLocalHost(); - } + serverIPAddress = ServerUtils.getHostAddress(hiveHost); } catch (UnknownHostException e) { throw new ServiceException(e); } + + // Initialize common server configs needed in both binary & http modes + String portString; // HTTP mode if (HiveServer2.isHTTPTransportMode(hiveConf)) { workerKeepAliveTime = http://git-wip-us.apache.org/repos/asf/hive/blob/9f57569b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java index 9c8cea0..e387659 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcConfiguration.java @@ -18,20 +18,19 @@ package org.apache.hive.spark.client.rpc; import java.io.IOException; -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.util.Enumeration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; + import javax.security.sasl.Sasl; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.conf.HiveConf; @@ -49,15 +48,14 @@ public final class RpcConfiguration { HiveConf.ConfVars.SPARK_RPC_CHANNEL_LOG_LEVEL.varname, HiveConf.ConfVars.SPARK_RPC_MAX_MESSAGE_SIZE.varname, HiveConf.ConfVars.SPARK_RPC_MAX_THREADS.varname, - HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname + HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.varname, + HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS.varname ); public static final ImmutableSet<String> HIVE_SPARK_TIME_CONFIGS = ImmutableSet.of( HiveConf.ConfVars.SPARK_RPC_CLIENT_CONNECT_TIMEOUT.varname, HiveConf.ConfVars.SPARK_RPC_CLIENT_HANDSHAKE_TIMEOUT.varname ); - public static final String SERVER_LISTEN_ADDRESS_KEY = "hive.spark.client.server.address"; - /** Prefix for other SASL options. */ public static final String RPC_SASL_OPT_PREFIX = "hive.spark.client.rpc.sasl."; @@ -91,39 +89,22 @@ public final class RpcConfiguration { return value != null ? Integer.parseInt(value) : HiveConf.ConfVars.SPARK_RPC_SECRET_RANDOM_BITS.defaultIntVal; } + /** + * Here we assume that the remote driver will connect back to HS2 using the same network interface + * as if it were just a HS2 client. If this isn't true, we can have a separate configuration for that. + * For now, I think we are okay. + * @return server host name in the network + * @throws IOException + */ String getServerAddress() throws IOException { - String value = config.get(SERVER_LISTEN_ADDRESS_KEY); - if (value != null) { - return value; - } - - InetAddress address = InetAddress.getLocalHost(); - if (address.isLoopbackAddress()) { - // Address resolves to something like 127.0.1.1, which happens on Debian; - // try to find a better address using the local network interfaces - Enumeration<NetworkInterface> ifaces = NetworkInterface.getNetworkInterfaces(); - while (ifaces.hasMoreElements()) { - NetworkInterface ni = ifaces.nextElement(); - Enumeration<InetAddress> addrs = ni.getInetAddresses(); - while (addrs.hasMoreElements()) { - InetAddress addr = addrs.nextElement(); - if (!addr.isLinkLocalAddress() && !addr.isLoopbackAddress() - && addr instanceof Inet4Address) { - // We've found an address that looks reasonable! - LOG.warn("Your hostname, {}, resolves to a loopback address; using {} " - + " instead (on interface {})", address.getHostName(), addr.getHostAddress(), - ni.getName()); - LOG.warn("Set '{}' if you need to bind to another address.", SERVER_LISTEN_ADDRESS_KEY); - return addr.getHostAddress(); - } - } + String hiveHost = config.get(HiveConf.ConfVars.SPARK_RPC_SERVER_ADDRESS); + if(StringUtils.isEmpty(hiveHost)) { + hiveHost = System.getenv("HIVE_SERVER2_THRIFT_BIND_HOST"); + if (hiveHost == null) { + hiveHost = config.get(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST); } } - - LOG.warn("Your hostname, {}, resolves to a loopback address, but we couldn't find " - + " any external IP address!", address.getHostName()); - LOG.warn("Set {} if you need to bind to another address.", SERVER_LISTEN_ADDRESS_KEY); - return address.getHostName(); + return ServerUtils.getHostAddress(hiveHost).getHostName(); } String getRpcChannelLogLevel() {
