Author: xuefu
Date: Thu Jul 31 01:01:32 2014
New Revision: 1614800
URL: http://svn.apache.org/r1614800
Log:
HIVE-7564: Remove some redundant code plus a bit of cleanup in SparkClient
[Spark Branch]
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1614800&r1=1614799&r2=1614800&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
Thu Jul 31 01:01:32 2014
@@ -28,8 +28,7 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.mapred.JobConf;
@@ -122,22 +121,14 @@ public class SparkClient implements Seri
}
public int execute(DriverContext driverContext, SparkWork sparkWork) {
- HiveConf hiveConf = (HiveConf)driverContext.getCtx().getConf();
+ Context ctx = driverContext.getCtx();
+ HiveConf hiveConf = (HiveConf)ctx.getConf();
refreshLocalResources(sparkWork, hiveConf);
-
- MapWork mapWork = sparkWork.getMapWork();
- ReduceWork redWork = sparkWork.getReduceWork();
-
- // TODO: need to combine spark conf and hive conf
JobConf jobConf = new JobConf(hiveConf);
- Context ctx = driverContext.getCtx();
+ // Create temporary scratch dir
Path emptyScratchDir;
try {
- if (ctx == null) {
- ctx = new Context(jobConf);
- }
-
emptyScratchDir = ctx.getMRTmpPath();
FileSystem fs = emptyScratchDir.getFileSystem(jobConf);
fs.mkdirs(emptyScratchDir);
@@ -148,65 +139,7 @@ public class SparkClient implements Seri
return 5;
}
- List<Path> inputPaths;
- try {
- inputPaths = Utilities.getInputPaths(jobConf, mapWork, emptyScratchDir,
ctx, false);
- } catch (Exception e2) {
- e2.printStackTrace();
- return -1;
- }
- Utilities.setInputPaths(jobConf, inputPaths);
- Utilities.setMapWork(jobConf, mapWork, emptyScratchDir, true);
- if (redWork != null)
- Utilities.setReduceWork(jobConf, redWork, emptyScratchDir, true);
-
- try {
- Utilities.createTmpDirs(jobConf, mapWork);
- Utilities.createTmpDirs(jobConf, redWork);
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- /*
- try {
- Path planPath = new Path(jobConf.getWorkingDirectory(), "plan.xml");
- System.out.println("Serializing plan to path: " + planPath);
- OutputStream os2 = planPath.getFileSystem(jobConf).create(planPath);
- Utilities.serializePlan(mapWork, os2, jobConf);
- } catch (IOException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- return 1;
- }
- */
- /* JavaPairRDD rdd = createRDD(sc, jobConf, mapWork);
- byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
- HiveMapFunction mf = new HiveMapFunction(confBytes);
- JavaPairRDD rdd2 = rdd.mapPartitionsToPair(mf);
- if (redWork == null) {
- rdd2.foreach(HiveVoidFunction.getInstance());
- if (mapWork.getAliasToWork() != null) {
- for (Operator<? extends OperatorDesc> op :
mapWork.getAliasToWork().values()) {
- try {
- op.jobClose(jobConf, true);
- } catch (HiveException e) {
- System.out.println("Calling jobClose() failed: " + e);
- e.printStackTrace();
- }
- }
- }
- } else {
- JavaPairRDD rdd3 = rdd2.partitionBy(new HashPartitioner(1)); // Two
partitions.
- HiveReduceFunction rf = new HiveReduceFunction(confBytes);
- JavaPairRDD rdd4 = rdd3.mapPartitionsToPair(rf);
- rdd4.foreach(HiveVoidFunction.getInstance());
- try {
- redWork.getReducer().jobClose(jobConf, true);
- } catch (HiveException e) {
- System.out.println("Calling jobClose() failed: " + e);
- e.printStackTrace();
- }
- }
- */
+ // Generate Spark plan
SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf,
emptyScratchDir);
SparkPlan plan;
try {
@@ -216,6 +149,8 @@ public class SparkClient implements Seri
return 2;
}
+ // Execute generated plan.
+ // TODO: we should catch any exception and return more meaningful error
code.
plan.execute();
return 0;
}
@@ -239,20 +174,17 @@ public class SparkClient implements Seri
HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
addResources(addedJars, localJars);
}
+
// add plugin module jars on demand
final String MR_JAR_PROPERTY = "tmpjars";
// jobConf will hold all the configuration for hadoop, tez, and hive
JobConf jobConf = new JobConf(conf);
jobConf.setStrings(MR_JAR_PROPERTY, new String[0]);
- // TODO update after SparkCompiler finished.
- // for (BaseWork work : sparkWork.getAllWork()) {
- // work.configureJobConf(jobConf);
- // }
- sparkWork.getMapWork().configureJobConf(jobConf);
- ReduceWork redWork = sparkWork.getReduceWork();
- if (redWork != null) {
- redWork.configureJobConf(jobConf);
+
+ for (BaseWork work : sparkWork.getAllWork()) {
+ work.configureJobConf(jobConf);
}
+
String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY);
if (newTmpJars != null && newTmpJars.length > 0) {
for (String tmpJar : newTmpJars) {