Repository: hive
Updated Branches:
  refs/heads/master bad9eb666 -> 4c57ed35f


HIVE-13771 : LLAPIF: generate app ID (Sergey Shelukhin, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4c57ed35
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4c57ed35
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4c57ed35

Branch: refs/heads/master
Commit: 4c57ed35f1151269a1adbbf64db300ae2a1fa915
Parents: bad9eb6
Author: Sergey Shelukhin <[email protected]>
Authored: Tue Jun 14 19:26:31 2016 -0700
Committer: Sergey Shelukhin <[email protected]>
Committed: Wed Jun 15 14:11:49 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/coordinator/LlapCoordinator.java | 13 +++++++++----
 .../hive/ql/udf/generic/GenericUDTFGetSplits.java     | 14 +++++++-------
 2 files changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4c57ed35/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java
index f55779b..ebddfc8 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java
@@ -23,6 +23,7 @@ import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -76,6 +77,8 @@ public class LlapCoordinator {
 
   private HiveConf hiveConf;
   private String clusterUser;
+  private long startTime;
+  private final AtomicInteger appIdCounter = new AtomicInteger(0);
 
   LlapCoordinator() {
   }
@@ -85,6 +88,11 @@ public class LlapCoordinator {
     // HS2 init without the knowledge of LLAP usage (or lack thereof) in the 
cluster.
     this.hiveConf = hiveConf;
     this.clusterUser = 
UserGroupInformation.getCurrentUser().getShortUserName();
+    // TODO: if two HS2s start at exactly the same time, which could happen 
during a coordinated
+    //       restart, they could start generating the same IDs. Should we 
store the startTime
+    //       somewhere like ZK? Try to randomize it a bit for now...
+    long randomBits = (long)(new Random().nextInt()) << 32;
+    this.startTime = Math.abs((System.currentTimeMillis() & 
(long)Integer.MAX_VALUE) | randomBits);
   }
 
   public LlapSigner getLlapSigner(final Configuration jobConf) {
@@ -105,14 +113,11 @@ public class LlapCoordinator {
   }
 
   public ApplicationId createExtClientAppId() {
-    // TODO: moved from UDTF; need JIRA to generate this properly (no dups, 
etc.)...
-    return ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0);
     // Note that we cannot allow users to provide app ID, since providing 
somebody else's appId
     // would give one LLAP token (and splits) for that app ID. If we could 
verify it somehow
     // (YARN token? nothing we can do in an UDF), we could get it from client 
already running on
     // YARN. As such, the clients running on YARN will have two app IDs to be 
aware of.
-    // TODO: Perhaps they can give us their app id as an argument to the UDF, 
and we'd just append
-    // a unique string here, for easier tracking?
+    return ApplicationId.newInstance(startTime, 
appIdCounter.incrementAndGet());
   }
 
   public LlapTokenLocalClient getLocalTokenClient(

http://git-wip-us.apache.org/repos/asf/hive/blob/4c57ed35/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index a2ad4f9..bdf254b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -334,7 +334,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
       }
 
       // See the discussion in the implementation as to why we generate app ID.
-      ApplicationId fakeApplicationId = coordinator.createExtClientAppId();
+      ApplicationId applicationId = coordinator.createExtClientAppId();
 
       // This assumes LLAP cluster owner is always the HS2 user.
       String llapUser = UserGroupInformation.getLoginUser().getShortUserName();
@@ -354,7 +354,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
         LlapTokenLocalClient tokenClient = 
coordinator.getLocalTokenClient(job, llapUser);
         // We put the query user, not LLAP user, into the message and token.
         Token<LlapTokenIdentifier> token = tokenClient.createToken(
-            fakeApplicationId.toString(), queryUser, true);
+            applicationId.toString(), queryUser, true);
         bos.reset();
         token.write(dos);
         tokenBytes = bos.toByteArray();
@@ -366,15 +366,15 @@ public class GenericUDTFGetSplits extends GenericUDTF {
       SignedMessage signedSvs = null;
       for (int i = 0; i < eventList.size() - 1; i++) {
         TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag, 
vertexName,
-              eventList.size() - 1, fakeApplicationId, i);
+              eventList.size() - 1, applicationId, i);
 
         // 2. Generate the vertex/submit information for all events.
         if (i == 0) {
           // Despite the differences in TaskSpec, the vertex spec should be 
the same.
-          signedSvs = createSignedVertexSpec(signer, taskSpec, 
fakeApplicationId, queryUser);
+          signedSvs = createSignedVertexSpec(signer, taskSpec, applicationId, 
queryUser);
         }
 
-        SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(fakeApplicationId,
+        SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId,
             System.currentTimeMillis(), taskSpec.getVertexParallelism(), 
signedSvs.message,
             signedSvs.signature);
         byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
@@ -426,10 +426,10 @@ public class GenericUDTFGetSplits extends GenericUDTF {
   }
 
   private SignedMessage createSignedVertexSpec(LlapSigner signer, TaskSpec 
taskSpec,
-      ApplicationId fakeApplicationId, String queryUser) throws IOException {
+      ApplicationId applicationId, String queryUser) throws IOException {
 
     final SignableVertexSpec.Builder svsb = Converters.convertTaskSpecToProto(
-        taskSpec, 0, fakeApplicationId.toString(), queryUser);
+        taskSpec, 0, applicationId.toString(), queryUser);
     if (signer == null) {
       SignedMessage result = new SignedMessage();
       result.message = serializeVertexSpec(svsb);

Reply via email to