Repository: hive Updated Branches: refs/heads/master bad9eb666 -> 4c57ed35f
HIVE-13771 : LLAPIF: generate app ID (Sergey Shelukhin, reviewed by Jason Dere) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4c57ed35 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4c57ed35 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4c57ed35 Branch: refs/heads/master Commit: 4c57ed35f1151269a1adbbf64db300ae2a1fa915 Parents: bad9eb6 Author: Sergey Shelukhin <[email protected]> Authored: Tue Jun 14 19:26:31 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Wed Jun 15 14:11:49 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/llap/coordinator/LlapCoordinator.java | 13 +++++++++---- .../hive/ql/udf/generic/GenericUDTFGetSplits.java | 14 +++++++------- 2 files changed, 16 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4c57ed35/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java b/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java index f55779b..ebddfc8 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/coordinator/LlapCoordinator.java @@ -23,6 +23,7 @@ import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; @@ -76,6 +77,8 @@ public class LlapCoordinator { private HiveConf hiveConf; private String clusterUser; + private long startTime; + private final AtomicInteger appIdCounter = new AtomicInteger(0); LlapCoordinator() { } @@ -85,6 +88,11 @@ public class LlapCoordinator { // HS2 init without the knowledge of LLAP usage (or lack thereof) in the cluster. this.hiveConf = hiveConf; this.clusterUser = UserGroupInformation.getCurrentUser().getShortUserName(); + // TODO: if two HS2s start at exactly the same time, which could happen during a coordinated + // restart, they could start generating the same IDs. Should we store the startTime + // somewhere like ZK? Try to randomize it a bit for now... + long randomBits = (long)(new Random().nextInt()) << 32; + this.startTime = Math.abs((System.currentTimeMillis() & (long)Integer.MAX_VALUE) | randomBits); } public LlapSigner getLlapSigner(final Configuration jobConf) { @@ -105,14 +113,11 @@ public class LlapCoordinator { } public ApplicationId createExtClientAppId() { - // TODO: moved from UDTF; need JIRA to generate this properly (no dups, etc.)... - return ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0); // Note that we cannot allow users to provide app ID, since providing somebody else's appId // would give one LLAP token (and splits) for that app ID. If we could verify it somehow // (YARN token? nothing we can do in an UDF), we could get it from client already running on // YARN. As such, the clients running on YARN will have two app IDs to be aware of. - // TODO: Perhaps they can give us their app id as an argument to the UDF, and we'd just append - // a unique string here, for easier tracking? + return ApplicationId.newInstance(startTime, appIdCounter.incrementAndGet()); } public LlapTokenLocalClient getLocalTokenClient( http://git-wip-us.apache.org/repos/asf/hive/blob/4c57ed35/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index a2ad4f9..bdf254b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -334,7 +334,7 @@ public class GenericUDTFGetSplits extends GenericUDTF { } // See the discussion in the implementation as to why we generate app ID. - ApplicationId fakeApplicationId = coordinator.createExtClientAppId(); + ApplicationId applicationId = coordinator.createExtClientAppId(); // This assumes LLAP cluster owner is always the HS2 user. String llapUser = UserGroupInformation.getLoginUser().getShortUserName(); @@ -354,7 +354,7 @@ public class GenericUDTFGetSplits extends GenericUDTF { LlapTokenLocalClient tokenClient = coordinator.getLocalTokenClient(job, llapUser); // We put the query user, not LLAP user, into the message and token. Token<LlapTokenIdentifier> token = tokenClient.createToken( - fakeApplicationId.toString(), queryUser, true); + applicationId.toString(), queryUser, true); bos.reset(); token.write(dos); tokenBytes = bos.toByteArray(); @@ -366,15 +366,15 @@ public class GenericUDTFGetSplits extends GenericUDTF { SignedMessage signedSvs = null; for (int i = 0; i < eventList.size() - 1; i++) { TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag, vertexName, - eventList.size() - 1, fakeApplicationId, i); + eventList.size() - 1, applicationId, i); // 2. Generate the vertex/submit information for all events. if (i == 0) { // Despite the differences in TaskSpec, the vertex spec should be the same. - signedSvs = createSignedVertexSpec(signer, taskSpec, fakeApplicationId, queryUser); + signedSvs = createSignedVertexSpec(signer, taskSpec, applicationId, queryUser); } - SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(fakeApplicationId, + SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(applicationId, System.currentTimeMillis(), taskSpec.getVertexParallelism(), signedSvs.message, signedSvs.signature); byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo); @@ -426,10 +426,10 @@ public class GenericUDTFGetSplits extends GenericUDTF { } private SignedMessage createSignedVertexSpec(LlapSigner signer, TaskSpec taskSpec, - ApplicationId fakeApplicationId, String queryUser) throws IOException { + ApplicationId applicationId, String queryUser) throws IOException { final SignableVertexSpec.Builder svsb = Converters.convertTaskSpecToProto( - taskSpec, 0, fakeApplicationId.toString(), queryUser); + taskSpec, 0, applicationId.toString(), queryUser); if (signer == null) { SignedMessage result = new SignedMessage(); result.message = serializeVertexSpec(svsb);
