Author: xuefu
Date: Tue Jul 29 10:18:00 2014
New Revision: 1614300

URL: http://svn.apache.org/r1614300
Log:
HIVE-7338: Create SparkPlanGenerator

Modified:
    hive/branches/spark/conf/hive-default.xml.template
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java

Modified: hive/branches/spark/conf/hive-default.xml.template
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/conf/hive-default.xml.template?rev=1614300&r1=1614299&r2=1614300&view=diff
==============================================================================
--- hive/branches/spark/conf/hive-default.xml.template (original)
+++ hive/branches/spark/conf/hive-default.xml.template Tue Jul 29 10:18:00 2014
@@ -1,5 +1,7 @@
 <?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><!--
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
@@ -14,7 +16,8 @@
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
---><configuration>
+-->
+<configuration>
   <!-- WARNING!!! This file is auto generated for documentation purposes ONLY! 
-->
   <!-- WARNING!!! Any changes you make to this file will be ignored by Hive.   
-->
   <!-- WARNING!!! You must make your changes in hive-site.xml instead.         
-->

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1614300&r1=1614299&r2=1614300&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
 Tue Jul 29 10:18:00 2014
@@ -20,11 +20,7 @@ package org.apache.hadoop.hive.ql.exec.s
 
 import java.util.Iterator;
 
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
-import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
@@ -59,6 +55,7 @@ BytesWritable, BytesWritable> {
     collector.clear();
     while(it.hasNext() && !ExecMapper.getDone()) {
       Tuple2<BytesWritable, BytesWritable> input = it.next();
+      System.out.println("mapper input: " + input._1() + ", " + input._2());
       mapper.map(input._1(), input._2(), collector, Reporter.NULL);
     }
     

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java?rev=1614300&r1=1614299&r2=1614300&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
 Tue Jul 29 10:18:00 2014
@@ -18,13 +18,16 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.function.VoidFunction;
 
+import scala.Tuple2;
+
 /**
  * Implementation of a voidFunction that does nothing.
  *
  */
-public class HiveVoidFunction implements VoidFunction<Object> {
+public class HiveVoidFunction implements VoidFunction<Tuple2<BytesWritable, 
BytesWritable>> {
   private static final long serialVersionUID = 1L;
   
   private static HiveVoidFunction instance = new HiveVoidFunction();
@@ -37,7 +40,7 @@ public class HiveVoidFunction implements
   }
 
   @Override
-  public void call(Object arg0) throws Exception {
+  public void call(Tuple2<BytesWritable, BytesWritable> t) throws Exception {
   }
 
 }

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1614300&r1=1614299&r2=1614300&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
 Tue Jul 29 10:18:00 2014
@@ -27,27 +27,17 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.HashPartitioner;
 import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.*;
 
@@ -84,7 +74,8 @@ public class SparkClient implements Seri
     // set default spark configurations.
     sparkConf.set("spark.master", SPARK_DEFAULT_MASTER);
     sparkConf.set("spark.app.name", SAPRK_DEFAULT_APP_NAME);
-
+    sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
+    sparkConf.set("spark.default.parallelism",  "1");
     // load properties from spark-defaults.conf.
     InputStream inputStream = null;
     try {
@@ -131,9 +122,6 @@ public class SparkClient implements Seri
   }
 
   public int execute(DriverContext driverContext, SparkWork sparkWork) {
-    int rc = 1;
-//    System.out.println("classpath=\n"+System.getProperty("java.class.path") 
+ "\n");
-
     HiveConf hiveConf = (HiveConf)driverContext.getCtx().getConf();
     refreshLocalResources(sparkWork, hiveConf);
 
@@ -178,7 +166,7 @@ public class SparkClient implements Seri
     } catch (IOException e1) {
       e1.printStackTrace();
     }
-
+/*
     try {
       Path planPath = new Path(jobConf.getWorkingDirectory(), "plan.xml");
       System.out.println("Serializing plan to path: " + planPath);
@@ -189,8 +177,8 @@ public class SparkClient implements Seri
       e1.printStackTrace();
       return 1;
     }
-    
-    JavaPairRDD rdd = createRDD(sc, jobConf, mapWork);
+*/  
+/*    JavaPairRDD rdd = createRDD(sc, jobConf, mapWork);
     byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
     HiveMapFunction mf = new HiveMapFunction(confBytes);
     JavaPairRDD rdd2 = rdd.mapPartitionsToPair(mf);
@@ -207,7 +195,7 @@ public class SparkClient implements Seri
         }
       }
     } else {
-      JavaPairRDD rdd3 = rdd2.partitionBy(new 
HashPartitioner(1/*redWork.getNumReduceTasks()*/)); // Two partitions.
+      JavaPairRDD rdd3 = rdd2.partitionBy(new HashPartitioner(1)); // Two 
partitions.
       HiveReduceFunction rf = new HiveReduceFunction(confBytes);
       JavaPairRDD rdd4 = rdd3.mapPartitionsToPair(rf);
       rdd4.foreach(HiveVoidFunction.getInstance());
@@ -218,18 +206,20 @@ public class SparkClient implements Seri
         e.printStackTrace();
       }
     }
-    
+*/ 
+    SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf, 
emptyScratchDir);
+    SparkPlan plan;
+    try {
+      plan = gen.generate(sparkWork);
+    } catch (Exception e) {
+      e.printStackTrace();
+      return 2;
+    }
+
+    plan.execute();
     return 0;
   }
   
-  private JavaPairRDD createRDD(JavaSparkContext sc, JobConf jobConf, MapWork 
mapWork) {
-    Class ifClass = HiveInputFormat.class;
-
-    // The mapper class is expected by the HiveInputFormat.
-    jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
-    return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, 
Writable.class);
-  }
-
   private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
     // add hive-exec jar
     String hiveJar = conf.getJar();

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1614300&r1=1614299&r2=1614300&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
 Tue Jul 29 10:18:00 2014
@@ -18,18 +18,57 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.util.List;
+
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.util.StringUtils;
 
 public class SparkTask extends Task<SparkWork> {
   private static final long serialVersionUID = 1L;
 
   @Override
   public int execute(DriverContext driverContext) {
-    SparkClient client = 
SparkClient.getInstance(driverContext.getCtx().getConf());
-    return client.execute(driverContext, getWork());
+    int rc = 1;
+    SparkClient client = null;
+    try {
+      client = SparkClient.getInstance(driverContext.getCtx().getConf());
+      rc = client.execute(driverContext, getWork());
+    } finally {
+      if (client != null) {
+        rc = close(rc);
+      }
+    }
+    return rc;
+  }
+  
+  /**
+   * close will move the temp files into the right place for the fetch
+   * task. If the job has failed it will clean up the files.
+   */
+  private int close(int rc) {
+    try {
+      List<BaseWork> ws = work.getAllWork();
+      for (BaseWork w: ws) {
+        for (Operator<?> op: w.getAllOperators()) {
+          op.jobClose(conf, rc == 0);
+        }
+      }
+    } catch (Exception e) {
+      // jobClose needs to execute successfully otherwise fail task
+      if (rc == 0) {
+        rc = 3;
+        String mesg = "Job Commit failed with exception '"
+          + Utilities.getNameMessage(e) + "'";
+        console.printError(mesg, "\n" + StringUtils.stringifyException(e));
+      }
+    }
+    return rc;
   }
 
   @Override


Reply via email to