Repository: hive Updated Branches: refs/heads/master 3dcd36336 -> 98982e619
HIVE-19079: Add extended query string to Spark job description (Sahil Takiar, reviewed by Aihua Xu) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/98982e61 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/98982e61 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/98982e61 Branch: refs/heads/master Commit: 98982e6193ce05111d031bb8d54ce530ae41853f Parents: 3dcd363 Author: Sahil Takiar <[email protected]> Authored: Mon Apr 2 09:31:38 2018 -0700 Committer: Sahil Takiar <[email protected]> Committed: Tue Jun 5 12:00:34 2018 -0500 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../java/org/apache/hadoop/hive/ql/Driver.java | 8 +- .../ql/exec/spark/LocalHiveSparkClient.java | 3 + .../ql/exec/spark/RemoteHiveSparkClient.java | 7 +- .../hive/ql/exec/spark/TestHiveSparkClient.java | 97 ++++++++++++++++++++ 5 files changed, 115 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/98982e61/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 9004894..cd425aa 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1494,6 +1494,8 @@ public class HiveConf extends Configuration { HIVEQUERYID("hive.query.id", "", "ID for query being executed (might be multiple per a session)"), + HIVESPARKJOBNAMELENGTH("hive.spark.jobname.length", 100000, "max jobname length for Hive on " + + "Spark queries"), HIVEJOBNAMELENGTH("hive.jobname.length", 50, "max jobname length"), // hive jar http://git-wip-us.apache.org/repos/asf/hive/blob/98982e61/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 7d5e7d4..a3dcc3b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2033,7 +2033,13 @@ public class Driver implements IDriver { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE); boolean noName = StringUtils.isEmpty(conf.get(MRJobConfig.JOB_NAME)); - int maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH); + + int maxlen; + if ("spark".equals(conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))) { + maxlen = conf.getIntVar(HiveConf.ConfVars.HIVESPARKJOBNAMELENGTH); + } else { + maxlen = conf.getIntVar(HiveConf.ConfVars.HIVEJOBNAMELENGTH); + } Metrics metrics = MetricsFactory.getInstance(); String queryId = queryState.getQueryId(); http://git-wip-us.apache.org/repos/asf/hive/blob/98982e61/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index dff3b0f..72ff53e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -163,6 +163,9 @@ public class LocalHiveSparkClient implements HiveSparkClient { // Execute generated plan. JavaPairRDD<HiveKey, BytesWritable> finalRDD = plan.generateGraph(); + // We get the query name for this SparkTask and set it to the description for the associated + // Spark job; query names are guaranteed to be unique for each Spark job because the task id + // is concatenated to the end of the query name sc.setJobGroup("queryId = " + sparkWork.getQueryId(), DagUtils.getQueryName(jobConf)); // We use Spark RDD async action to submit job as it's the only way to get jobId now. http://git-wip-us.apache.org/repos/asf/hive/blob/98982e61/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index e8f39ae..d31a202 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.spark; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Splitter; import com.google.common.base.Strings; @@ -306,7 +307,8 @@ public class RemoteHiveSparkClient implements HiveSparkClient { localJars.clear(); } - private static class JobStatusJob implements Job<Serializable> { + @VisibleForTesting + static class JobStatusJob implements Job<Serializable> { private static final long serialVersionUID = 1L; private final byte[] jobConfBytes; @@ -358,6 +360,9 @@ public class RemoteHiveSparkClient implements HiveSparkClient { new SparkPlanGenerator(jc.sc(), null, localJobConf, localScratchDir, sparkReporter); SparkPlan plan = gen.generate(localSparkWork); + // We get the query name for this SparkTask and set it to the description for the associated + // Spark job; query names are guaranteed to be unique for each Spark job because the task id + // is concatenated to the end of the query name jc.sc().setJobGroup("queryId = " + localSparkWork.getQueryId(), DagUtils.getQueryName(localJobConf)); // Execute generated plan. http://git-wip-us.apache.org/repos/asf/hive/blob/98982e61/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java new file mode 100644 index 0000000..239c098 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestHiveSparkClient.java @@ -0,0 +1,97 @@ +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hive.spark.client.JobContext; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.file.Paths; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class TestHiveSparkClient { + + @Test + public void testSetJobGroupAndDescription() throws Exception { + + HiveConf conf = new HiveConf(); + conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, + SQLStdHiveAuthorizerFactory.class.getName()); + conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark"); + conf.set("spark.master", "local"); + conf.set("spark.local.dir", Paths.get(System.getProperty("test.tmp.dir"), + "TestHiveSparkClient-local-dir").toString()); + + SessionState.start(conf); + FileSystem fs = FileSystem.getLocal(conf); + Path tmpDir = new Path("TestSparkPlan-tmp"); + + IDriver driver = null; + JavaSparkContext sc = null; + + try { + driver = DriverFactory.newDriver(conf); + Assert.assertEquals(0, driver.run("create table test (col int)").getResponseCode()); + + String query = "select * from test order by col"; + driver.compile(query); + List<SparkTask> sparkTasks = Utilities.getSparkTasks(driver.getPlan().getRootTasks()); + Assert.assertEquals(1, sparkTasks.size()); + + SparkTask sparkTask = sparkTasks.get(0); + + conf.set(MRJobConfig.JOB_NAME, query); + JobConf jobConf = new JobConf(conf); + + SparkConf sparkConf = new SparkConf(); + sparkConf.setMaster("local"); + sparkConf.setAppName("TestSparkPlan-app"); + sc = new JavaSparkContext(sparkConf); + + byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf); + byte[] scratchDirBytes = KryoSerializer.serialize(tmpDir); + byte[] sparkWorkBytes = KryoSerializer.serialize(sparkTask.getWork()); + + RemoteHiveSparkClient.JobStatusJob job = new RemoteHiveSparkClient.JobStatusJob(jobConfBytes, + scratchDirBytes, sparkWorkBytes); + + JobContext mockJobContext = mock(JobContext.class); + when(mockJobContext.sc()).thenReturn(sc); + + job.call(mockJobContext); + + Assert.assertTrue(sc.getLocalProperty("spark.job.description").contains(query)); + Assert.assertTrue(sc.getLocalProperty("spark.jobGroup.id") + .contains(sparkTask.getWork().getQueryId())); + } finally { + if (driver != null) { + Assert.assertEquals(0, driver.run("drop table if exists test").getResponseCode()); + driver.destroy(); + } + if (sc != null) { + sc.close(); + } + if (fs.exists(tmpDir)) { + fs.delete(tmpDir, true); + } + } + } +}
