Author: xuefu
Date: Thu Jul 31 01:01:32 2014
New Revision: 1614800

URL: http://svn.apache.org/r1614800
Log:
HIVE-7564: Remove some redundant code plus a bit of cleanup in SparkClient 
[Spark Branch]

Modified:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1614800&r1=1614799&r2=1614800&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
 Thu Jul 31 01:01:32 2014
@@ -28,8 +28,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.JobConf;
@@ -122,22 +121,14 @@ public class SparkClient implements Seri
   }
 
   public int execute(DriverContext driverContext, SparkWork sparkWork) {
-    HiveConf hiveConf = (HiveConf)driverContext.getCtx().getConf();
+    Context ctx = driverContext.getCtx();
+    HiveConf hiveConf = (HiveConf)ctx.getConf();
     refreshLocalResources(sparkWork, hiveConf);
-
-    MapWork mapWork = sparkWork.getMapWork();
-    ReduceWork redWork = sparkWork.getReduceWork();
-
-    // TODO: need to combine spark conf and hive conf
     JobConf jobConf = new JobConf(hiveConf);
 
-    Context ctx = driverContext.getCtx();
+    // Create temporary scratch dir
     Path emptyScratchDir;
     try {
-      if (ctx == null) {
-        ctx = new Context(jobConf);
-      }
-
       emptyScratchDir = ctx.getMRTmpPath();
       FileSystem fs = emptyScratchDir.getFileSystem(jobConf);
       fs.mkdirs(emptyScratchDir);
@@ -148,65 +139,7 @@ public class SparkClient implements Seri
       return 5;
     }
 
-    List<Path> inputPaths;
-    try {
-      inputPaths = Utilities.getInputPaths(jobConf, mapWork, emptyScratchDir, 
ctx, false);
-    } catch (Exception e2) {
-      e2.printStackTrace();
-      return -1;
-    }
-    Utilities.setInputPaths(jobConf, inputPaths);
-    Utilities.setMapWork(jobConf, mapWork, emptyScratchDir, true);
-    if (redWork != null)
-      Utilities.setReduceWork(jobConf, redWork, emptyScratchDir, true);
-
-    try {
-      Utilities.createTmpDirs(jobConf, mapWork);
-      Utilities.createTmpDirs(jobConf, redWork);
-    } catch (IOException e1) {
-      e1.printStackTrace();
-    }
-    /*
-    try {
-      Path planPath = new Path(jobConf.getWorkingDirectory(), "plan.xml");
-      System.out.println("Serializing plan to path: " + planPath);
-      OutputStream os2 = planPath.getFileSystem(jobConf).create(planPath);
-      Utilities.serializePlan(mapWork, os2, jobConf);
-    } catch (IOException e1) {
-      // TODO Auto-generated catch block
-      e1.printStackTrace();
-      return 1;
-    }
-     */  
-    /*    JavaPairRDD rdd = createRDD(sc, jobConf, mapWork);
-    byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
-    HiveMapFunction mf = new HiveMapFunction(confBytes);
-    JavaPairRDD rdd2 = rdd.mapPartitionsToPair(mf);
-    if (redWork == null) {
-      rdd2.foreach(HiveVoidFunction.getInstance());
-      if (mapWork.getAliasToWork() != null) {
-        for (Operator<? extends OperatorDesc> op : 
mapWork.getAliasToWork().values()) {
-          try {
-            op.jobClose(jobConf, true);
-          } catch (HiveException e) {
-            System.out.println("Calling jobClose() failed: " + e);
-            e.printStackTrace();
-          }
-        }
-      }
-    } else {
-      JavaPairRDD rdd3 = rdd2.partitionBy(new HashPartitioner(1)); // Two 
partitions.
-      HiveReduceFunction rf = new HiveReduceFunction(confBytes);
-      JavaPairRDD rdd4 = rdd3.mapPartitionsToPair(rf);
-      rdd4.foreach(HiveVoidFunction.getInstance());
-      try {
-        redWork.getReducer().jobClose(jobConf, true);
-      } catch (HiveException e) {
-        System.out.println("Calling jobClose() failed: " + e);
-        e.printStackTrace();
-      }
-    }
-     */ 
+    // Generate Spark plan
     SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf, 
emptyScratchDir);
     SparkPlan plan;
     try {
@@ -216,6 +149,8 @@ public class SparkClient implements Seri
       return 2;
     }
 
+    // Execute generated plan.
+    // TODO: we should catch any exception and return more meaningful error 
code.
     plan.execute();
     return 0;
   }
@@ -239,20 +174,17 @@ public class SparkClient implements Seri
       HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
       addResources(addedJars, localJars);
     }
+
     // add plugin module jars on demand
     final String MR_JAR_PROPERTY = "tmpjars";
     // jobConf will hold all the configuration for hadoop, tez, and hive
     JobConf jobConf = new JobConf(conf);
     jobConf.setStrings(MR_JAR_PROPERTY, new String[0]);
-    // TODO update after SparkCompiler finished.
-    //    for (BaseWork work : sparkWork.getAllWork()) {
-    //      work.configureJobConf(jobConf);
-    //    }
-    sparkWork.getMapWork().configureJobConf(jobConf);
-    ReduceWork redWork = sparkWork.getReduceWork();
-    if (redWork != null) {
-      redWork.configureJobConf(jobConf);
+
+    for (BaseWork work : sparkWork.getAllWork()) {
+      work.configureJobConf(jobConf);
     }
+
     String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY);
     if (newTmpJars != null && newTmpJars.length > 0) {
       for (String tmpJar : newTmpJars) {


Reply via email to