Author: xuefu
Date: Wed Jul  9 14:16:49 2014
New Revision: 1609166

URL: http://svn.apache.org/r1609166
Log:
HIVE-7370: Initial ground work for Hive on Spark [Spark branch]

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
Modified:
    
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/spark/pom.xml
    hive/branches/spark/ql/pom.xml
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java

Modified: 
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1609166&r1=1609165&r2=1609166&view=diff
==============================================================================
--- 
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
(original)
+++ 
hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
Wed Jul  9 14:16:49 2014
@@ -995,7 +995,7 @@ public class HiveConf extends Configurat
     HIVE_DECODE_PARTITION_NAME("hive.decode.partition.name", false),
 
     HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr",
-        new StringsValidator("mr", "tez")),
+        new StringsValidator("mr", "tez", "spark")),
     HIVE_JAR_DIRECTORY("hive.jar.directory", null),
     HIVE_USER_INSTALL_DIR("hive.user.install.directory", "hdfs:///user/"),
 

Modified: hive/branches/spark/pom.xml
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/pom.xml?rev=1609166&r1=1609165&r2=1609166&view=diff
==============================================================================
--- hive/branches/spark/pom.xml (original)
+++ hive/branches/spark/pom.xml Wed Jul  9 14:16:49 2014
@@ -105,7 +105,7 @@
     <commons-lang3.version>3.1</commons-lang3.version>
     <commons-logging.version>1.1.3</commons-logging.version>
     <derby.version>10.10.1.1</derby.version>
-    <guava.version>11.0.2</guava.version>
+    <guava.version>14.0.1</guava.version>
     <groovy.version>2.1.6</groovy.version>
     <hadoop-20.version>0.20.2</hadoop-20.version>
     <hadoop-20S.version>1.2.1</hadoop-20S.version>
@@ -145,6 +145,9 @@
     <slf4j.version>1.7.5</slf4j.version>
     <ST4.version>4.0.4</ST4.version>
     <tez.version>0.4.0-incubating</tez.version>
+    <spark.version>1.0.1-SNAPSHOT</spark.version>
+    <scala.binary.version>2.10</scala.binary.version>
+    <scala.version>2.10.4</scala.version>
     <tempus-fugit.version>1.1</tempus-fugit.version>
     <snappy.version>0.2</snappy.version>
     <wadl-resourcedoc-doclet.version>1.4</wadl-resourcedoc-doclet.version>
@@ -1014,6 +1017,16 @@
             <artifactId>hadoop-minicluster</artifactId>
             <version>${hadoop-23.version}</version>
           </dependency>
+          <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+          </dependency>
         </dependencies>
       </dependencyManagement>
     </profile>

Modified: hive/branches/spark/ql/pom.xml
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/pom.xml?rev=1609166&r1=1609165&r2=1609166&view=diff
==============================================================================
--- hive/branches/spark/ql/pom.xml (original)
+++ hive/branches/spark/ql/pom.xml Wed Jul  9 14:16:49 2014
@@ -327,6 +327,13 @@
        </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>provided</scope>
+      <optional>true</optional>
+    </dependency>
   </dependencies>
 
   <profiles>
@@ -533,7 +540,7 @@
                   <include>org.iq80.snappy:snappy</include>
                   <include>org.codehaus.jackson:jackson-core-asl</include>
                   <include>org.codehaus.jackson:jackson-mapper-asl</include>
-                  <include>com.google.guava:guava</include>
+                  <!--include>com.google.guava:guava</include-->
                 </includes>
               </artifactSet>
               <relocations>

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1609166&r1=1609165&r2=1609166&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java 
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java Wed 
Jul  9 14:16:49 2014
@@ -532,7 +532,12 @@ public class Context {
    * Today this translates into running hadoop jobs locally
    */
   public boolean isLocalOnlyExecutionMode() {
-    return ShimLoader.getHadoopShims().isLocalMode(conf);
+    // TODO: hack to allow spark to run in a cluster mode. Without this, 
depending on
+    // user's local hadoop settings, true may be returned, which causes plan 
to be 
+    // stored in local path. This will be addressed in subsquent patches, 
where false is
+    // returned for spark always. 
+    return false;
+//    return ShimLoader.getHadoopShims().isLocalMode(conf);
   }
 
   public List<HiveLock> getHiveLocks() {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1609166&r1=1609165&r2=1609166&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Wed 
Jul  9 14:16:49 2014
@@ -1209,8 +1209,9 @@ public class Driver implements CommandPr
         }
       }
 
-      int jobs = Utilities.getMRTasks(plan.getRootTasks()).size()
-        + Utilities.getTezTasks(plan.getRootTasks()).size();
+      int jobs = Utilities.getMRTasks(plan.getRootTasks()).size() +
+                 Utilities.getTezTasks(plan.getRootTasks()).size() +
+                 Utilities.getSparkTasks(plan.getRootTasks()).size();
       if (jobs > 0) {
         console.printInfo("Query ID = " + plan.getQueryId());
         console.printInfo("Total jobs = " + jobs);

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1609166&r1=1609165&r2=1609166&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
(original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
Wed Jul  9 14:16:49 2014
@@ -110,6 +110,7 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -305,15 +306,18 @@ public final class Utilities {
     InputStream in = null;
     try {
       path = getPlanPath(conf, name);
+      LOG.info("PLAN PATH = " + path);
       assert path != null;
       if (!gWorkMap.containsKey(path)) {
         Path localPath;
         if (ShimLoader.getHadoopShims().isLocalMode(conf)) {
           localPath = path;
         } else {
+          LOG.info("***************non-local mode***************");
           localPath = new Path(name);
         }
-
+        localPath = path;
+        LOG.info("local path = " + localPath);
         if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) {
           LOG.debug("Loading plan from string: "+path.toUri().getPath());
           String planString = conf.get(path.toUri().getPath());
@@ -325,7 +329,9 @@ public final class Utilities {
           in = new ByteArrayInputStream(planBytes);
           in = new InflaterInputStream(in);
         } else {
-          in = new FileInputStream(localPath.toUri().getPath());
+          LOG.info("Open file to read in plan: " + localPath);
+//          in = new FileInputStream(localPath.toUri().getPath());
+          in = localPath.getFileSystem(conf).open(localPath);
         }
 
         if(MAP_PLAN_NAME.equals(name)){
@@ -357,6 +363,7 @@ public final class Utilities {
       return gWork;
     } catch (FileNotFoundException fnf) {
       // happens. e.g.: no reduce work.
+      LOG.info("File not found: " + fnf.getMessage());
       LOG.info("No plan file found: "+path);
       return null;
     } catch (Exception e) {
@@ -2367,6 +2374,26 @@ public final class Utilities {
     }
   }
 
+  public static List<SparkTask> getSparkTasks(List<Task<? extends 
Serializable>> tasks) {
+    List<SparkTask> sparkTasks = new ArrayList<SparkTask>();
+    if (tasks != null) {
+      getSparkTasks(tasks, sparkTasks);
+    }
+    return sparkTasks;
+  }
+
+  private static void getSparkTasks(List<Task<? extends Serializable>> tasks, 
List<SparkTask> sparkTasks) {
+    for (Task<? extends Serializable> task : tasks) {
+      if (task instanceof SparkTask && !sparkTasks.contains(task)) {
+        sparkTasks.add((SparkTask) task);
+      }
+
+      if (task.getDependentTasks() != null) {
+        getSparkTasks(task.getDependentTasks(), sparkTasks);
+      }
+    }
+  }
+
   public static List<ExecDriver> getMRTasks(List<Task<? extends Serializable>> 
tasks) {
     List<ExecDriver> mrTasks = new ArrayList<ExecDriver>();
     if (tasks != null) {
@@ -3408,7 +3435,7 @@ public final class Utilities {
       if (origUmask != null) {
         conf.set("fs.permissions.umask-mode", origUmask);
       } else {
-        conf.unset("fs.permissions.umask-mode");
+        //conf.unset("fs.permissions.umask-mode");
       }
     }
 

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1609166&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
 Wed Jul  9 14:16:49 2014
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+import scala.Tuple2;
+
+public class HiveMapFunction implements 
PairFlatMapFunction<Iterator<Tuple2<BytesWritable, BytesWritable>>,
+BytesWritable, BytesWritable> {
+  private static final long serialVersionUID = 1L;
+  
+  private transient ExecMapper mapper;
+  private transient SparkCollector collector;
+  private transient JobConf jobConf;
+
+  private byte[] buffer;
+
+  public HiveMapFunction(byte[] buffer) {
+    this.buffer = buffer;
+  }
+
+  @Override
+  public Iterable<Tuple2<BytesWritable, BytesWritable>> 
+  call(Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception {
+    if (jobConf == null) {
+      jobConf = KryoSerializer.deserializeJobConf(this.buffer);
+      Path path = new Path(jobConf.getWorkingDirectory(), "plan.xml");
+      FSDataInputStream in = path.getFileSystem(jobConf).open(path);
+      MapWork mw = Utilities.deserializePlan(in, MapWork.class, jobConf);
+      
+      Utilities.setMapWork(jobConf, mw);
+      mapper = new ExecMapper();
+      mapper.configure(jobConf);
+      collector = new SparkCollector();
+    }
+
+    collector.clear();
+    while(it.hasNext() && !ExecMapper.getDone()) {
+      Tuple2<BytesWritable, BytesWritable> input = it.next();
+      mapper.map(input._1(), input._2(), collector, Reporter.NULL);
+    }
+    
+    mapper.close();
+    return collector.getResult();
+  }
+  
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1609166&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
 Wed Jul  9 14:16:49 2014
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+
+import scala.Tuple2;
+
+public class HiveReduceFunction implements 
PairFlatMapFunction<Iterator<Tuple2<BytesWritable,BytesWritable>>,
+BytesWritable, BytesWritable> {
+  private static final long serialVersionUID = 1L;
+  
+  private transient ExecReducer reducer;
+  private transient SparkCollector collector;
+  private transient JobConf jobConf;
+
+  private byte[] buffer;
+
+  public HiveReduceFunction(byte[] buffer) {
+    this.buffer = buffer;
+  }
+
+  @Override
+  public Iterable<Tuple2<BytesWritable, BytesWritable>> 
call(Iterator<Tuple2<BytesWritable,BytesWritable>> it)
+      throws Exception {
+    if (jobConf == null) {
+      jobConf = KryoSerializer.deserializeJobConf(this.buffer);
+      jobConf.set("mapred.reducer.class", ExecReducer.class.getName());      
+ 
+      reducer = new ExecReducer();
+      reducer.configure(jobConf);
+      collector = new SparkCollector();
+    }
+
+    collector.clear();
+    Map<BytesWritable, List<BytesWritable>> clusteredRows = 
+        new HashMap<BytesWritable, List<BytesWritable>>();
+    while (it.hasNext()) {
+      Tuple2<BytesWritable, BytesWritable> input = it.next();
+      BytesWritable key = input._1();
+      BytesWritable value = input._2();
+      System.out.println("reducer row: " + key + "/" + value);
+      // cluster the input according to key.
+      List<BytesWritable> valueList = clusteredRows.get(key);
+      if (valueList == null) {
+        valueList = new ArrayList<BytesWritable>();
+        clusteredRows.put(key, valueList);
+      }
+      valueList.add(value);
+    }
+    
+    for (Map.Entry<BytesWritable, List<BytesWritable>> entry : 
clusteredRows.entrySet()) {
+      // pass on the clustered result to the reducer operator tree.
+      reducer.reduce(entry.getKey(), entry.getValue().iterator(), collector, 
Reporter.NULL);
+    }
+    
+    reducer.close();
+    return collector.getResult();
+  }
+  
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java?rev=1609166&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
 Wed Jul  9 14:16:49 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.spark.api.java.function.VoidFunction;
+
+/**
+ * Implementation of a voidFunction that does nothing.
+ *
+ */
+public class HiveVoidFunction implements VoidFunction<Object> {
+  private static final long serialVersionUID = 1L;
+  
+  private static HiveVoidFunction instance = new HiveVoidFunction();
+
+  public static HiveVoidFunction getInstance() {
+    return instance;
+  }
+  
+  private HiveVoidFunction() {
+  }
+
+  @Override
+  public void call(Object arg0) throws Exception {
+  }
+
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java?rev=1609166&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
 Wed Jul  9 14:16:49 2014
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.mapred.JobConf;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class KryoSerializer {
+  private static final Kryo kryo = Utilities.runtimeSerializationKryo.get();
+
+  static {
+    kryo.register(ExecMapper.class);
+  }
+
+  public static byte[] serialize(Object object) {
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    Output output = new Output(stream);
+
+    kryo.writeObject(output, object);
+
+    output.close(); // close() also calls flush()
+    return stream.toByteArray();
+  }
+
+  public static <T> T deserialize(byte[] buffer,Class<T> clazz)  {
+    return kryo.readObject(new Input(new ByteArrayInputStream(buffer)), clazz);
+  }
+
+  public static byte[] serializeJobConf(JobConf jobConf) {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try {
+      jobConf.write(new DataOutputStream(out));
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      return null;
+    } finally {
+      try {
+        out.close();
+      } catch (IOException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+
+    return out.toByteArray();
+
+  }
+  
+  public static JobConf deserializeJobConf(byte[] buffer) {
+    JobConf conf = new JobConf();
+    try {
+      conf.readFields(new DataInputStream(new ByteArrayInputStream(buffer)));
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      return null;
+    }
+    return conf;
+  }
+
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1609166&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
 Wed Jul  9 14:16:49 2014
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class SparkClient implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private static String masterUrl = "local";
+
+  private static String appName = "Hive-Spark";
+
+  private static String sparkHome = "/home/xzhang/apache/spark";
+  
+  private static int reducerCount = 5;
+  
+  private static String execMem = "1g";
+  private static String execJvmOpts = "";
+
+  static {
+    String envSparkHome = System.getenv("SPARK_HOME");
+    if (envSparkHome != null) {
+      sparkHome = envSparkHome;
+    }
+
+    String envMaster = System.getenv("MASTER");
+    if (envMaster != null) {
+      masterUrl = envMaster;
+    }
+
+    String reducers = System.getenv("REDUCERS");
+    if (reducers != null) {
+      reducerCount = Integer.valueOf(reducers);
+    }
+
+    String mem = System.getenv("spark_executor_memory");
+    if (mem != null) {
+      execMem = mem;
+    }
+
+    String jopts = System.getenv("spark_executor_extraJavaOptions");
+    if (jopts != null) {
+      execJvmOpts = jopts;
+    }
+
+  }
+  
+  private static SparkClient client = new SparkClient();
+  
+  public static SparkClient getInstance() {
+    return client;
+  }
+  
+  private JavaSparkContext sc;
+
+  private SparkClient() {
+    SparkConf sparkConf = new 
SparkConf().setAppName(appName).setMaster(masterUrl).setSparkHome(sparkHome);
+    sparkConf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
+    sparkConf.set("spark.default.parallelism", "1");
+    sparkConf.set("spark.executor.memory", execMem);
+    sparkConf.set("spark.executor.extraJavaOptions", execJvmOpts);
+    sc = new JavaSparkContext(sparkConf);
+    addJars();
+  }
+
+  public int execute(DriverContext driverContext, SparkWork sparkWork) {
+    int rc = 1;
+//    System.out.println("classpath=\n"+System.getProperty("java.class.path") 
+ "\n");
+    MapWork mapWork = sparkWork.getMapWork();
+    ReduceWork redWork = sparkWork.getReduceWork();
+    
+    Configuration hiveConf = driverContext.getCtx().getConf();
+    // TODO: need to combine spark conf and hive conf
+    JobConf jobConf = new JobConf(hiveConf);
+
+    Context ctx = driverContext.getCtx();
+    Path emptyScratchDir;
+    try {
+      if (ctx == null) {
+        ctx = new Context(jobConf);
+      }
+
+      emptyScratchDir = ctx.getMRTmpPath();
+      FileSystem fs = emptyScratchDir.getFileSystem(jobConf);
+      fs.mkdirs(emptyScratchDir);
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.err.println("Error launching map-reduce job" + "\n"
+        + org.apache.hadoop.util.StringUtils.stringifyException(e));
+      return 5;
+    }
+
+    List<Path> inputPaths;
+    try {
+      inputPaths = Utilities.getInputPaths(jobConf, mapWork, emptyScratchDir, 
ctx);
+    } catch (Exception e2) {
+      e2.printStackTrace();
+      return -1;
+    }
+    Utilities.setInputPaths(jobConf, inputPaths);
+    Utilities.setMapWork(jobConf, mapWork, emptyScratchDir, true);
+    if (redWork != null)
+      Utilities.setReduceWork(jobConf, redWork, emptyScratchDir, true);
+
+    try {
+      Utilities.createTmpDirs(jobConf, mapWork);
+      Utilities.createTmpDirs(jobConf, redWork);
+    } catch (IOException e1) {
+      e1.printStackTrace();
+    }
+
+    try {
+      Path planPath = new Path(jobConf.getWorkingDirectory(), "plan.xml");
+      System.out.println("Serializing plan to path: " + planPath);
+      OutputStream os2 = planPath.getFileSystem(jobConf).create(planPath);
+      Utilities.serializePlan(mapWork, os2, jobConf);
+    } catch (IOException e1) {
+      // TODO Auto-generated catch block
+      e1.printStackTrace();
+      return 1;
+    }
+    
+    JavaPairRDD rdd = createRDD(sc, jobConf, mapWork);
+    byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+    HiveMapFunction mf = new HiveMapFunction(confBytes);
+    JavaPairRDD rdd2 = rdd.mapPartitionsToPair(mf);
+    if (redWork == null) {
+      rdd2.foreach(HiveVoidFunction.getInstance());
+      if (mapWork.getAliasToWork() != null) {
+        for (Operator<? extends OperatorDesc> op : 
mapWork.getAliasToWork().values()) {
+          try {
+            op.jobClose(jobConf, true);
+          } catch (HiveException e) {
+            System.out.println("Calling jobClose() failed: " + e);
+            e.printStackTrace();
+          }
+        }
+      }
+    } else {
+      JavaPairRDD rdd3 = rdd2.partitionBy(new 
HashPartitioner(reducerCount/*redWork.getNumReduceTasks()*/)); // Two 
partitions.
+      HiveReduceFunction rf = new HiveReduceFunction(confBytes);
+      JavaPairRDD rdd4 = rdd3.mapPartitionsToPair(rf);
+      rdd4.foreach(HiveVoidFunction.getInstance());
+      try {
+        redWork.getReducer().jobClose(jobConf, true);
+      } catch (HiveException e) {
+        System.out.println("Calling jobClose() failed: " + e);
+        e.printStackTrace();
+      }
+    }
+    
+    return 0;
+  }
+  
+  private JavaPairRDD createRDD(JavaSparkContext sc, JobConf jobConf, MapWork 
mapWork) {
+    Class ifClass = HiveInputFormat.class;
+
+    // The mapper class is expected by the HiveInputFormat.
+    jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
+    return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, 
Writable.class);
+  }
+
+  private void addJars() {
+    ClassLoader cl = ClassLoader.getSystemClassLoader();
+
+    
System.out.println("-----------------------------------------------------");
+    URL[] urls = ((URLClassLoader)cl).getURLs();
+
+    for(URL url: urls){
+      java.io.File file = new java.io.File(url.getFile());
+      if (file.exists() && file.isFile()) {
+        if (file.getName().contains("guava")) {
+          System.out.println("** skipping guava jar **: " + url.getFile());
+        } else {
+          System.out.println("adding jar: " + url.getFile());
+          sc.addJar(url.getFile());
+        }
+      }
+    }
+
+    System.out.println("---------------------------------------------- 
------");
+  }
+
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java?rev=1609166&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java
 Wed Jul  9 14:16:49 2014
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+
+import scala.Tuple2;
+
+public class SparkCollector implements OutputCollector<BytesWritable, 
BytesWritable>, Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private List<Tuple2<BytesWritable, BytesWritable>> result = new 
ArrayList<Tuple2<BytesWritable, BytesWritable>>();
+  
+  @Override
+  public void collect(BytesWritable key, BytesWritable value) throws 
IOException {
+    result.add(new Tuple2<BytesWritable, BytesWritable>(key, value));
+  }
+  
+  public void clear() {
+    result.clear();
+  }
+
+  public List<Tuple2<BytesWritable, BytesWritable>> getResult() {
+    return result;
+  }
+
+}

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1609166&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
 Wed Jul  9 14:16:49 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+public class SparkTask extends Task<SparkWork> {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public int execute(DriverContext driverContext) {
+    SparkClient client = SparkClient.getInstance();
+    return client.execute(driverContext, getWork());
+  }
+
+  @Override
+  public boolean isMapRedTask() {
+    return true;
+  }
+
+  @Override
+  public StageType getType() {
+    return StageType.MAPRED;
+  }
+
+  @Override
+  public String getName() {
+    return "SPARK";
+  }
+
+}

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=1609166&r1=1609165&r2=1609166&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java 
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java 
Wed Jul  9 14:16:49 2014
@@ -18,14 +18,20 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import java.io.Serializable;
+
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.WritableComparator;
 
 /**
  * HiveKey is a simple wrapper on Text which allows us to set the hashCode
  * easily. hashCode is used for hadoop partitioner.
+ * 
+ * TODO: spark require key to be serializable, even if it's considered 
serializable by
+ * Hadoop. For now, we let it implement Serializable. However, we expect that 
this is
+ * not needed soon.
  */
-public class HiveKey extends BytesWritable {
+public class HiveKey extends BytesWritable implements Serializable {
 
   private static final int LENGTH_BYTES = 4;
 

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1609166&r1=1609165&r2=1609166&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
 Wed Jul  9 14:16:49 2014
@@ -62,6 +62,7 @@ import org.apache.hadoop.hive.ql.exec.Un
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
@@ -100,6 +101,7 @@ import org.apache.hadoop.hive.ql.plan.Pa
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -853,6 +855,16 @@ public final class GenMapRedUtils {
           ((MapWork)w).deriveExplainAttributes();
         }
       }
+    } else if (task instanceof SparkTask) {
+      SparkWork sw = ((SparkTask)task).getWork();
+      sw.getMapWork().deriveExplainAttributes();
+      HashMap<String, Operator<? extends OperatorDesc>> opMap = sw
+          .getMapWork().getAliasToWork();
+      if (opMap != null && !opMap.isEmpty()) {
+        for (Operator<? extends OperatorDesc> op : opMap.values()) {
+          setKeyAndValueDesc(sw.getReduceWork(), op);
+        }
+      }
     }
 
     if (task.getChildTasks() == null) {

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java?rev=1609166&r1=1609165&r2=1609166&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java
 Wed Jul  9 14:16:49 2014
@@ -147,7 +147,9 @@ public class MapReduceCompiler extends T
   }
 
   // loop over all the operators recursively
-  private void breakOperatorTree(Operator<? extends OperatorDesc> topOp) {
+  // TODO: changed from private to protected for SparkCompiler to use. It will 
be changed back onece SparkCompiler
+  //       stands alone.
+  protected void breakOperatorTree(Operator<? extends OperatorDesc> topOp) {
     if (topOp instanceof ReduceSinkOperator) {
       topOp.setChildOperators(null);
     }

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java?rev=1609166&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java
 (added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java
 Wed Jul  9 14:16:49 2014
@@ -0,0 +1,117 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
+/**
+ * SparkCompiler translates the operator plan into SparkTask.
+ * TODO: currently extending MapReduceCompiler in order to make POC work. It 
will
+ *       stand alone parallel to MapReduceCompiler.
+ */
+public class SparkCompiler extends MapReduceCompiler {
+
+  protected final Log LOG = LogFactory.getLog(SparkCompiler.class);
+
+  public SparkCompiler() {
+  }
+
+  @Override
+  public void init(HiveConf conf, LogHelper console, Hive db) {
+    super.init(conf, console, db);
+    // Any Spark specific configuration
+  }
+
+  @Override
+  protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> 
inputs,
+      Set<WriteEntity> outputs) throws SemanticException {
+  }
+
+  private static int counter = 0;
+  
+  @Override
+  protected void generateTaskTree(List<Task<? extends Serializable>> 
rootTasks, ParseContext pCtx,
+      List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> 
outputs)
+      throws SemanticException {
+    super.generateTaskTree(rootTasks, pCtx, mvTask, inputs, outputs);
+
+    MapRedTask mrTask = (MapRedTask) rootTasks.get(0);
+    MapWork mapWork = mrTask.getWork().getMapWork();
+    ReduceWork redWork = mrTask.getWork().getReduceWork();
+    SparkWork sparkWork = new SparkWork("first spark #" + counter++);
+    sparkWork.setMapWork(mapWork);
+    sparkWork.setReduceWork(redWork);
+    SparkTask task = new SparkTask();
+    task.setWork(sparkWork);
+    task.setId(sparkWork.getName());
+    rootTasks.clear();
+    rootTasks.add(task);
+
+    // finally make sure the file sink operators are set up right
+    breakTaskTree(task);
+  }
+
+  private void breakTaskTree(Task<? extends Serializable> task) {
+    if (task instanceof SparkTask) {
+      SparkTask st = (SparkTask) task;
+      SparkWork sw = st.getWork();
+      MapWork mw = sw.getMapWork();
+      HashMap<String, Operator<? extends OperatorDesc>> opMap = 
mw.getAliasToWork();
+      if (!opMap.isEmpty()) {
+        for (Operator<? extends OperatorDesc> op : opMap.values()) {
+          breakOperatorTree(op);
+        }
+      }
+    }
+  }
+
+  @Override
+  protected void decideExecMode(List<Task<? extends Serializable>> rootTasks, 
Context ctx,
+      GlobalLimitCtx globalLimitCtx)
+      throws SemanticException {
+    // currently all Spark work is on the cluster
+    return;
+  }
+
+  @Override
+  protected void optimizeTaskPlan(List<Task<? extends Serializable>> 
rootTasks, ParseContext pCtx,
+      Context ctx) throws SemanticException {
+  }
+
+}

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java?rev=1609166&r1=1609165&r2=1609166&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java
 Wed Jul  9 14:16:49 2014
@@ -37,6 +37,8 @@ public class TaskCompilerFactory {
   public static TaskCompiler getCompiler(HiveConf conf, ParseContext 
parseContext) {
     if (HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
       return new TezCompiler();
+    } else if (HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+      return new SparkCompiler();
     } else {
       return new MapReduceCompiler();
     }

Added: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java?rev=1609166&view=auto
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java 
(added)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java 
Wed Jul  9 14:16:49 2014
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class encapsulates all the work objects that can be executed
+ * in a single Spark job. Currently it's basically a tree with MapWork at the
+ * leaves and and ReduceWork in all other nodes.
+ */
+@SuppressWarnings("serial")
+@Explain(displayName = "Spark")
+public class SparkWork extends AbstractOperatorDesc {
+  private static transient final Log logger = 
LogFactory.getLog(SparkWork.class);
+
+  private static int counter;
+  private final String name;
+  
+  private MapWork mapWork;
+  private ReduceWork redWork;
+
+  public SparkWork(String name) {
+    this.name = name + ":" + (++counter);
+  }
+
+  @Explain(displayName = "DagName")
+  public String getName() {
+    return name;
+  }
+
+  public MapWork getMapWork() {
+    return mapWork;
+  }
+
+  public void setMapWork(MapWork mapWork) {
+    this.mapWork = mapWork;
+  }
+
+  public void setReduceWork(ReduceWork redWork) {
+    this.redWork = redWork;
+  }
+  
+  public ReduceWork getReduceWork() {
+    return redWork;
+  }
+
+}


Reply via email to