Author: xuefu
Date: Tue Jul 29 10:18:00 2014
New Revision: 1614300
URL: http://svn.apache.org/r1614300
Log:
HIVE-7338: Create SparkPlanGenerator
Modified:
hive/branches/spark/conf/hive-default.xml.template
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
Modified: hive/branches/spark/conf/hive-default.xml.template
URL:
http://svn.apache.org/viewvc/hive/branches/spark/conf/hive-default.xml.template?rev=1614300&r1=1614299&r2=1614300&view=diff
==============================================================================
--- hive/branches/spark/conf/hive-default.xml.template (original)
+++ hive/branches/spark/conf/hive-default.xml.template Tue Jul 29 10:18:00 2014
@@ -1,5 +1,7 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?><!--
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
@@ -14,7 +16,8 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---><configuration>
+-->
+<configuration>
<!-- WARNING!!! This file is auto generated for documentation purposes ONLY!
-->
<!-- WARNING!!! Any changes you make to this file will be ignored by Hive.
-->
<!-- WARNING!!! You must make your changes in hive-site.xml instead.
-->
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1614300&r1=1614299&r2=1614300&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
Tue Jul 29 10:18:00 2014
@@ -20,11 +20,7 @@ package org.apache.hadoop.hive.ql.exec.s
import java.util.Iterator;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
-import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
@@ -59,6 +55,7 @@ BytesWritable, BytesWritable> {
collector.clear();
while(it.hasNext() && !ExecMapper.getDone()) {
Tuple2<BytesWritable, BytesWritable> input = it.next();
+ System.out.println("mapper input: " + input._1() + ", " + input._2());
mapper.map(input._1(), input._2(), collector, Reporter.NULL);
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java?rev=1614300&r1=1614299&r2=1614300&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
Tue Jul 29 10:18:00 2014
@@ -18,13 +18,16 @@
package org.apache.hadoop.hive.ql.exec.spark;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.function.VoidFunction;
+import scala.Tuple2;
+
/**
* Implementation of a voidFunction that does nothing.
*
*/
-public class HiveVoidFunction implements VoidFunction<Object> {
+public class HiveVoidFunction implements VoidFunction<Tuple2<BytesWritable,
BytesWritable>> {
private static final long serialVersionUID = 1L;
private static HiveVoidFunction instance = new HiveVoidFunction();
@@ -37,7 +40,7 @@ public class HiveVoidFunction implements
}
@Override
- public void call(Object arg0) throws Exception {
+ public void call(Tuple2<BytesWritable, BytesWritable> t) throws Exception {
}
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1614300&r1=1614299&r2=1614300&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
Tue Jul 29 10:18:00 2014
@@ -27,27 +27,17 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.io.Serializable;
import java.util.*;
@@ -84,7 +74,8 @@ public class SparkClient implements Seri
// set default spark configurations.
sparkConf.set("spark.master", SPARK_DEFAULT_MASTER);
sparkConf.set("spark.app.name", SAPRK_DEFAULT_APP_NAME);
-
+ sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
+ sparkConf.set("spark.default.parallelism", "1");
// load properties from spark-defaults.conf.
InputStream inputStream = null;
try {
@@ -131,9 +122,6 @@ public class SparkClient implements Seri
}
public int execute(DriverContext driverContext, SparkWork sparkWork) {
- int rc = 1;
-// System.out.println("classpath=\n"+System.getProperty("java.class.path")
+ "\n");
-
HiveConf hiveConf = (HiveConf)driverContext.getCtx().getConf();
refreshLocalResources(sparkWork, hiveConf);
@@ -178,7 +166,7 @@ public class SparkClient implements Seri
} catch (IOException e1) {
e1.printStackTrace();
}
-
+/*
try {
Path planPath = new Path(jobConf.getWorkingDirectory(), "plan.xml");
System.out.println("Serializing plan to path: " + planPath);
@@ -189,8 +177,8 @@ public class SparkClient implements Seri
e1.printStackTrace();
return 1;
}
-
- JavaPairRDD rdd = createRDD(sc, jobConf, mapWork);
+*/
+/* JavaPairRDD rdd = createRDD(sc, jobConf, mapWork);
byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
HiveMapFunction mf = new HiveMapFunction(confBytes);
JavaPairRDD rdd2 = rdd.mapPartitionsToPair(mf);
@@ -207,7 +195,7 @@ public class SparkClient implements Seri
}
}
} else {
- JavaPairRDD rdd3 = rdd2.partitionBy(new
HashPartitioner(1/*redWork.getNumReduceTasks()*/)); // Two partitions.
+ JavaPairRDD rdd3 = rdd2.partitionBy(new HashPartitioner(1)); // Two
partitions.
HiveReduceFunction rf = new HiveReduceFunction(confBytes);
JavaPairRDD rdd4 = rdd3.mapPartitionsToPair(rf);
rdd4.foreach(HiveVoidFunction.getInstance());
@@ -218,18 +206,20 @@ public class SparkClient implements Seri
e.printStackTrace();
}
}
-
+*/
+ SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf,
emptyScratchDir);
+ SparkPlan plan;
+ try {
+ plan = gen.generate(sparkWork);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return 2;
+ }
+
+ plan.execute();
return 0;
}
- private JavaPairRDD createRDD(JavaSparkContext sc, JobConf jobConf, MapWork
mapWork) {
- Class ifClass = HiveInputFormat.class;
-
- // The mapper class is expected by the HiveInputFormat.
- jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
- return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class,
Writable.class);
- }
-
private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
// add hive-exec jar
String hiveJar = conf.getJar();
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1614300&r1=1614299&r2=1614300&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
Tue Jul 29 10:18:00 2014
@@ -18,18 +18,57 @@
package org.apache.hadoop.hive.ql.exec.spark;
+import java.util.List;
+
import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.util.StringUtils;
public class SparkTask extends Task<SparkWork> {
private static final long serialVersionUID = 1L;
@Override
public int execute(DriverContext driverContext) {
- SparkClient client =
SparkClient.getInstance(driverContext.getCtx().getConf());
- return client.execute(driverContext, getWork());
+ int rc = 1;
+ SparkClient client = null;
+ try {
+ client = SparkClient.getInstance(driverContext.getCtx().getConf());
+ rc = client.execute(driverContext, getWork());
+ } finally {
+ if (client != null) {
+ rc = close(rc);
+ }
+ }
+ return rc;
+ }
+
+ /**
+ * close will move the temp files into the right place for the fetch
+ * task. If the job has failed it will clean up the files.
+ */
+ private int close(int rc) {
+ try {
+ List<BaseWork> ws = work.getAllWork();
+ for (BaseWork w: ws) {
+ for (Operator<?> op: w.getAllOperators()) {
+ op.jobClose(conf, rc == 0);
+ }
+ }
+ } catch (Exception e) {
+ // jobClose needs to execute successfully otherwise fail task
+ if (rc == 0) {
+ rc = 3;
+ String mesg = "Job Commit failed with exception '"
+ + Utilities.getNameMessage(e) + "'";
+ console.printError(mesg, "\n" + StringUtils.stringifyException(e));
+ }
+ }
+ return rc;
}
@Override