Repository: hive Updated Branches: refs/heads/master b8fdd1392 -> 43e713746
HIVE-18389: Print out Spark Web UI URL to the console log (Sahil Takiar, reviewed by Peter Vary) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/43e71374 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/43e71374 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/43e71374 Branch: refs/heads/master Commit: 43e713746f038fb55e4a63dba59945bbfe448471 Parents: b8fdd13 Author: Sahil Takiar <takiar.sa...@gmail.com> Authored: Thu Feb 8 11:32:26 2018 -0800 Committer: Sahil Takiar <stak...@cloudera.com> Committed: Thu Feb 8 11:32:26 2018 -0800 ---------------------------------------------------------------------- .../hive/ql/exec/spark/RemoteHiveSparkClient.java | 3 ++- .../org/apache/hive/spark/client/BaseProtocol.java | 12 ++++++++++++ .../org/apache/hive/spark/client/RemoteDriver.java | 15 +++++++++++++++ .../apache/hive/spark/client/SparkClientFactory.java | 8 +++++--- .../apache/hive/spark/client/SparkClientImpl.java | 10 +++++++++- .../apache/hive/spark/client/TestSparkClient.java | 4 +++- 6 files changed, 46 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/43e71374/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index c571d1a..93d44dc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -101,7 +101,8 @@ public class RemoteHiveSparkClient implements HiveSparkClient { } private void createRemoteClient() throws Exception { - remoteClient = SparkClientFactory.createClient(conf, hiveConf, sessionId); + remoteClient = SparkClientFactory.createClient(conf, hiveConf, sessionId, + SessionState.LogHelper.getInfoStream()); if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PREWARM_ENABLED) && (SparkClientUtilities.isYarnMaster(hiveConf.get("spark.master")) || http://git-wip-us.apache.org/repos/asf/hive/blob/43e71374/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java index 0930325..7290809 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/BaseProtocol.java @@ -166,4 +166,16 @@ abstract class BaseProtocol extends RpcDispatcher { } + protected static class SparkUIWebURL<T extends Serializable> implements Serializable { + + final String UIWebURL; + + SparkUIWebURL(String UIWebURL) { + this.UIWebURL = UIWebURL; + } + + SparkUIWebURL() { + this(null); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/43e71374/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index ede8ce9..66cf339 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -25,6 +25,8 @@ import io.netty.channel.nio.NioEventLoopGroup; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.net.InetAddress; +import java.net.URI; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -169,6 +171,14 @@ public class RemoteDriver { throw e; } + if (jc.sc().sc().uiWebUrl().isDefined()) { + // Run a reverse DNS lookup on the URL + URI uri = URI.create(jc.sc().sc().uiWebUrl().get()); + InetAddress address = InetAddress.getByName(uri.getHost()); + this.protocol.sendUIWebURL(uri.getScheme() + "://" + address.getCanonicalHostName() + ":" + + uri.getPort()); + } + synchronized (jcLock) { for (Iterator<JobWrapper<?>> it = jobQueue.iterator(); it.hasNext();) { it.next().submit(); @@ -270,6 +280,11 @@ public class RemoteDriver { clientRpc.call(new JobMetrics(jobId, sparkJobId, stageId, taskId, metrics)); } + void sendUIWebURL(String UIWebURL) { + LOG.debug("Send UIWebURL({}) to Client.", UIWebURL); + clientRpc.call(new SparkUIWebURL(UIWebURL)); + } + private void handle(ChannelHandlerContext ctx, CancelJob msg) { JobWrapper<?> job = activeJobs.get(msg.id); if (job == null || !cancelJob(job)) { http://git-wip-us.apache.org/repos/asf/hive/blob/43e71374/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java index 8abeed8..6dfc1a5 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java @@ -18,6 +18,7 @@ package org.apache.hive.spark.client; import java.io.IOException; +import java.io.PrintStream; import java.util.Map; import org.apache.hadoop.hive.common.classification.InterfaceAudience; @@ -80,10 +81,11 @@ public final class SparkClientFactory { * @param sparkConf Configuration for the remote Spark application, contains spark.* properties. * @param hiveConf Configuration for Hive, contains hive.* properties. */ - public static SparkClient createClient(Map<String, String> sparkConf, HiveConf hiveConf, String sessionId) - throws IOException, SparkException { + public static SparkClient createClient(Map<String, String> sparkConf, HiveConf hiveConf, + String sessionId, PrintStream consoleStream) + throws IOException, SparkException { Preconditions.checkState(server != null, "initialize() not called."); - return new SparkClientImpl(server, sparkConf, hiveConf, sessionId); + return new SparkClientImpl(server, sparkConf, hiveConf, sessionId, consoleStream); } } http://git-wip-us.apache.org/repos/asf/hive/blob/43e71374/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index eed8e53..78317ed 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -38,6 +38,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; +import java.io.PrintStream; import java.io.Serializable; import java.io.Writer; import java.net.URI; @@ -88,13 +89,15 @@ class SparkClientImpl implements SparkClient { private final Map<String, JobHandleImpl<?>> jobs; private final Rpc driverRpc; private final ClientProtocol protocol; + private final PrintStream consoleStream; private volatile boolean isAlive; SparkClientImpl(RpcServer rpcServer, Map<String, String> conf, HiveConf hiveConf, - String sessionid) throws IOException, SparkException { + String sessionid, PrintStream consoleStream) throws IOException { this.conf = conf; this.hiveConf = hiveConf; this.jobs = Maps.newConcurrentMap(); + this.consoleStream = consoleStream; String secret = rpcServer.createSecret(); this.driverThread = startDriver(rpcServer, sessionid, secret); @@ -621,6 +624,11 @@ class SparkClientImpl implements SparkClient { } } + private void handle(ChannelHandlerContext ctx, SparkUIWebURL msg) { + String printMsg = "Hive on Spark Session Web UI URL: " + msg.UIWebURL; + consoleStream.println(printMsg); + LOG.info(printMsg); + } } private static class AddJarJob implements Job<Serializable> { http://git-wip-us.apache.org/repos/asf/hive/blob/43e71374/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java ---------------------------------------------------------------------- diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java index 579ca07..e66caee 100644 --- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java +++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java @@ -34,6 +34,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.InputStream; +import java.io.PrintStream; import java.io.Serializable; import java.net.URI; import java.util.Arrays; @@ -302,7 +303,8 @@ public class TestSparkClient { SparkClient client = null; try { test.config(conf); - client = SparkClientFactory.createClient(conf, HIVECONF, UUID.randomUUID().toString()); + client = SparkClientFactory.createClient(conf, HIVECONF, UUID.randomUUID().toString(), + mock(PrintStream.class)); test.call(client); } finally { if (client != null) {