Repository: hive
Updated Branches:
  refs/heads/master e297a157c -> 19774029c


HIVE-14624 : LLAP: Use FQDN when submitting work to LLAP (Sergey Shelukhin, 
reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/19774029
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/19774029
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/19774029

Branch: refs/heads/master
Commit: 19774029c4c1d90982354c36840bb485d74faaf1
Parents: e297a15
Author: Sergey Shelukhin <ser...@apache.org>
Authored: Tue Sep 20 11:30:49 2016 -0700
Committer: Sergey Shelukhin <ser...@apache.org>
Committed: Tue Sep 20 11:30:59 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/conf/HiveConf.java    |  3 +++
 .../java/org/apache/hadoop/hive/llap/LlapUtil.java    | 12 ++++++++++++
 .../apache/hadoop/hive/llap/LlapBaseInputFormat.java  |  6 +++---
 .../hive/llap/tezplugins/LlapTaskCommunicator.java    | 14 ++++++++++----
 .../llap/tezplugins/TestLlapTaskCommunicator.java     |  5 +++++
 5 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/19774029/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 301159e..ccdfca6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -393,6 +393,7 @@ public class HiveConf extends Configuration {
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_VALIDATE_ACLS.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_LOGGER.varname);
+    llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_AM_USE_FQDN.varname);
   }
 
   /**
@@ -2909,6 +2910,8 @@ public class HiveConf extends Configuration {
       new TimeValidator(TimeUnit.MILLISECONDS),
       "Amount of time to wait on connection failures to the AM from an LLAP 
daemon before\n" +
       "considering the AM to be dead.", 
"llap.am.liveness.connection.timeout-millis"),
+    LLAP_DAEMON_AM_USE_FQDN("hive.llap.am.use.fqdn", false,
+        "Whether to use FQDN of the AM machine when submitting work to LLAP."),
     // Not used yet - since the Writable RPC engine does not support this 
policy.
     LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS(
       "hive.llap.am.liveness.connection.sleep.between.retries.ms", "2000ms",

http://git-wip-us.apache.org/repos/asf/hive/blob/19774029/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java 
b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index 0c04d9d..8352943 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -14,6 +14,8 @@
 package org.apache.hadoop.hive.llap;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -25,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto.Builder;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -180,4 +183,13 @@ public class LlapUtil {
       return sb.toString();
     }
   }
+
+  public static String getAmHostNameFromAddress(InetSocketAddress address, 
Configuration conf) {
+    if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_DAEMON_AM_USE_FQDN)) 
{
+      return address.getHostName();
+    }
+    InetAddress ia = address.getAddress();
+    // getCanonicalHostName would either return FQDN, or an IP.
+    return (ia == null) ? address.getHostName() : ia.getCanonicalHostName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/19774029/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 7dae4fc..288a8eb 100644
--- 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -160,7 +160,7 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     SubmitWorkRequestProto request = constructSubmitWorkRequestProto(
         submitWorkInfo, llapSplit.getSplitNum(), attemptNum, 
llapClient.getAddress(),
         submitWorkInfo.getToken(), llapSplit.getFragmentBytes(),
-        llapSplit.getFragmentBytesSignature());
+        llapSplit.getFragmentBytesSignature(), job);
     llapClient.submitWork(request, host, llapSubmitPort);
 
     Socket socket = new Socket(host, serviceInstance.getOutputFormatPort());
@@ -290,7 +290,7 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
 
   private SubmitWorkRequestProto 
constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
       int taskNum, int attemptNum, InetSocketAddress address, 
Token<JobTokenIdentifier> token,
-      byte[] fragmentBytes, byte[] fragmentBytesSignature) throws IOException {
+      byte[] fragmentBytes, byte[] fragmentBytesSignature, JobConf job) throws 
IOException {
     ApplicationId appId = submitWorkInfo.getFakeAppId();
 
     // This works, assuming the executor is running within YARN.
@@ -325,7 +325,7 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     builder.setFragmentNumber(taskNum);
     builder.setAttemptNumber(attemptNum);
     builder.setContainerIdString(containerId.toString());
-    builder.setAmHost(address.getHostName());
+    builder.setAmHost(LlapUtil.getAmHostNameFromAddress(address, job));
     builder.setAmPort(address.getPort());
     builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
     builder.setFragmentRuntimeInfo(runtimeInfo.build());

http://git-wip-us.apache.org/repos/asf/hive/blob/19774029/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index d1d2ad4..7dd778d 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
@@ -108,6 +109,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
   private final LlapTaskUmbilicalProtocol umbilical;
   private final Token<LlapTokenIdentifier> token;
   private final String user;
+  private String amHost;
 
   // These two structures track the list of known nodes, and the list of nodes 
which are sending in keep-alive heartbeats.
   // Primarily for debugging purposes a.t.m, since there's some unexplained 
TASK_TIMEOUTS which are currently being observed.
@@ -218,9 +220,9 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
 
       server.start();
       this.address = NetUtils.getConnectAddress(server);
-      LOG.info(
-          "Started LlapUmbilical: " + umbilical.getClass().getName() + " at 
address: " + address +
-              " with numHandlers=" + numHandlers);
+      this.amHost = LlapUtil.getAmHostNameFromAddress(address, conf);
+      LOG.info("Started LlapUmbilical: " + umbilical.getClass().getName() + " 
at address: "
+          + address + " with numHandlers=" + numHandlers + " using the host 
name " + amHost);
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }
@@ -610,7 +612,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
     builder.setAttemptNumber(taskSpec.getTaskAttemptID().getId());
     builder.setContainerIdString(containerId.toString());
-    builder.setAmHost(getAddress().getHostName());
+    builder.setAmHost(getAmHostString());
     builder.setAmPort(getAddress().getPort());
 
     Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() ==
@@ -842,4 +844,8 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
         
.setAppAttemptNumber(getContext().getApplicationAttemptId().getAttemptId())
         .build();
   }
+
+  public String getAmHostString() {
+    return amHost;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/19774029/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
index 0f28f70..5efe7c6 100644
--- 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
+++ 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
@@ -417,6 +417,11 @@ public class TestLlapTaskCommunicator {
     public InetSocketAddress getAddress() {
       return InetSocketAddress.createUnresolved("localhost", 15001);
     }
+
+    @Override
+    public String getAmHostString() {
+      return "localhost";
+    }
   }
 
 }

Reply via email to