Repository: hive
Updated Branches:
  refs/heads/master 3dcd36336 -> 98982e619


HIVE-19079: Add extended query string to Spark job description (Sahil Takiar, 
reviewed by Aihua Xu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/98982e61
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/98982e61
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/98982e61

Branch: refs/heads/master
Commit: 98982e6193ce05111d031bb8d54ce530ae41853f
Parents: 3dcd363
Author: Sahil Takiar <[email protected]>
Authored: Mon Apr 2 09:31:38 2018 -0700
Committer: Sahil Takiar <[email protected]>
Committed: Tue Jun 5 12:00:34 2018 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 +
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  8 +-
 .../ql/exec/spark/LocalHiveSparkClient.java     |  3 +
 .../ql/exec/spark/RemoteHiveSparkClient.java    |  7 +-
 .../hive/ql/exec/spark/TestHiveSparkClient.java | 97 ++++++++++++++++++++
 5 files changed, 115 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/98982e61/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9004894..cd425aa 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1494,6 +1494,8 @@ public class HiveConf extends Configuration {
     HIVEQUERYID("hive.query.id", "",
         "ID for query being executed (might be multiple per a session)"),
 
+    HIVESPARKJOBNAMELENGTH("hive.spark.jobname.length", 100000, "max jobname 
length for Hive on " +
+            "Spark queries"),
     HIVEJOBNAMELENGTH("hive.jobname.length", 50, "max jobname length"),
 
     // hive jar

http://git-wip-us.apache.org/repos/asf/hive/blob/98982e61/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 7d5e7d4..a3dcc3b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -2033,7 +2033,13 @@ public class Driver implements IDriver {
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);
 
     boolean noName = StringUtils.isEmpty(conf.get(MRJobConfig.JOB_NAME));
-    int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
+
+    int maxlen;
+    if ("spark".equals(conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))) {
+      maxlen = conf.getIntVar(HiveConf.ConfVars.HIVESPARKJOBNAMELENGTH);
+    } else {
+      maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH);
+    }
     Metrics metrics = MetricsFactory.getInstance();
 
     String queryId = queryState.getQueryId();

http://git-wip-us.apache.org/repos/asf/hive/blob/98982e61/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
index dff3b0f..72ff53e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java
@@ -163,6 +163,9 @@ public class LocalHiveSparkClient implements 
HiveSparkClient {
     // Execute generated plan.
     JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph();
 
+    // We get the query name for this SparkTask and set it to the description 
for the associated
+    // Spark job; query names are guaranteed to be unique for each Spark job 
because the task id
+    // is concatenated to the end of the query name
     sc.setJobGroup("queryId = " + sparkWork.getQueryId(), 
DagUtils.getQueryName(jobConf));
 
     // We use Spark RDD async action to submit job as it's the only way to get 
jobId now.

http://git-wip-us.apache.org/repos/asf/hive/blob/98982e61/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index e8f39ae..d31a202 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 
@@ -306,7 +307,8 @@ public class RemoteHiveSparkClient implements 
HiveSparkClient {
     localJars.clear();
   }
 
-  private static class JobStatusJob implements Job<Serializable> {
+  @VisibleForTesting
+  static class JobStatusJob implements Job<Serializable> {
 
     private static final long serialVersionUID = 1L;
     private final byte[] jobConfBytes;
@@ -358,6 +360,9 @@ public class RemoteHiveSparkClient implements 
HiveSparkClient {
         new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, 
sparkReporter);
       SparkPlan plan = gen.generate(localSparkWork);
 
+      // We get the query name for this SparkTask and set it to the 
description for the associated
+      // Spark job; query names are guaranteed to be unique for each Spark job 
because the task id
+      // is concatenated to the end of the query name
       jc.sc().setJobGroup("queryId = " + localSparkWork.getQueryId(), 
DagUtils.getQueryName(localJobConf));
 
       // Execute generated plan.

http://git-wip-us.apache.org/repos/asf/hive/blob/98982e61/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java
new file mode 100644
index 0000000..239c098
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java
@@ -0,0 +1,97 @@
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import 
org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hive.spark.client.JobContext;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.file.Paths;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestHiveSparkClient {
+
+  @Test
+  public void testSetJobGroupAndDescription() throws Exception {
+
+    HiveConf conf = new HiveConf();
+    conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+            SQLStdHiveAuthorizerFactory.class.getName());
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark");
+    conf.set("spark.master", "local");
+    conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"),
+            "TestHiveSparkClient-local-dir").toString());
+
+    SessionState.start(conf);
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path tmpDir = new Path("TestSparkPlan-tmp");
+
+    IDriver driver = null;
+    JavaSparkContext sc = null;
+
+    try {
+      driver = DriverFactory.newDriver(conf);
+      Assert.assertEquals(0, driver.run("create table test (col 
int)").getResponseCode());
+
+      String query = "select * from test order by col";
+      driver.compile(query);
+      List<SparkTask> sparkTasks = 
Utilities.getSparkTasks(driver.getPlan().getRootTasks());
+      Assert.assertEquals(1, sparkTasks.size());
+
+      SparkTask sparkTask = sparkTasks.get(0);
+
+      conf.set(MRJobConfig.JOB_NAME, query);
+      JobConf jobConf = new JobConf(conf);
+
+      SparkConf sparkConf = new SparkConf();
+      sparkConf.setMaster("local");
+      sparkConf.setAppName("TestSparkPlan-app");
+      sc = new JavaSparkContext(sparkConf);
+
+      byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf);
+      byte[] scratchDirBytes = KryoSerializer.serialize(tmpDir);
+      byte[] sparkWorkBytes = KryoSerializer.serialize(sparkTask.getWork());
+
+      RemoteHiveSparkClient.JobStatusJob job = new 
RemoteHiveSparkClient.JobStatusJob(jobConfBytes,
+              scratchDirBytes, sparkWorkBytes);
+
+      JobContext mockJobContext = mock(JobContext.class);
+      when(mockJobContext.sc()).thenReturn(sc);
+
+      job.call(mockJobContext);
+
+      
Assert.assertTrue(sc.getLocalProperty("spark.job.description").contains(query));
+      Assert.assertTrue(sc.getLocalProperty("spark.jobGroup.id")
+              .contains(sparkTask.getWork().getQueryId()));
+    } finally {
+      if (driver != null) {
+        Assert.assertEquals(0, driver.run("drop table if exists 
test").getResponseCode());
+        driver.destroy();
+      }
+      if (sc != null) {
+        sc.close();
+      }
+      if (fs.exists(tmpDir)) {
+        fs.delete(tmpDir, true);
+      }
+    }
+  }
+}

Reply via email to