Author: xuefu Date: Wed Jul 9 14:16:49 2014 New Revision: 1609166 URL: http://svn.apache.org/r1609166 Log: HIVE-7370: Initial ground work for Hive on Spark [Spark branch]
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java hive/branches/spark/pom.xml hive/branches/spark/ql/pom.xml hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java Modified: hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java URL: http://svn.apache.org/viewvc/hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1609166&r1=1609165&r2=1609166&view=diff ============================================================================== --- hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original) +++ hive/branches/spark/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Jul 9 14:16:49 2014 @@ -995,7 +995,7 @@ public class HiveConf extends Configurat HIVE_DECODE_PARTITION_NAME("hive.decode.partition.name", false), HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr", - new StringsValidator("mr", "tez")), + new StringsValidator("mr", "tez", "spark")), HIVE_JAR_DIRECTORY("hive.jar.directory", null), HIVE_USER_INSTALL_DIR("hive.user.install.directory", "hdfs:///user/"), Modified: hive/branches/spark/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/spark/pom.xml?rev=1609166&r1=1609165&r2=1609166&view=diff ============================================================================== --- hive/branches/spark/pom.xml (original) +++ hive/branches/spark/pom.xml Wed Jul 9 14:16:49 2014 @@ -105,7 +105,7 @@ <commons-lang3.version>3.1</commons-lang3.version> <commons-logging.version>1.1.3</commons-logging.version> <derby.version>10.10.1.1</derby.version> - <guava.version>11.0.2</guava.version> + <guava.version>14.0.1</guava.version> <groovy.version>2.1.6</groovy.version> <hadoop-20.version>0.20.2</hadoop-20.version> <hadoop-20S.version>1.2.1</hadoop-20S.version> @@ -145,6 +145,9 @@ <slf4j.version>1.7.5</slf4j.version> <ST4.version>4.0.4</ST4.version> <tez.version>0.4.0-incubating</tez.version> + <spark.version>1.0.1-SNAPSHOT</spark.version> + <scala.binary.version>2.10</scala.binary.version> + <scala.version>2.10.4</scala.version> <tempus-fugit.version>1.1</tempus-fugit.version> <snappy.version>0.2</snappy.version> <wadl-resourcedoc-doclet.version>1.4</wadl-resourcedoc-doclet.version> @@ -1014,6 +1017,16 @@ <artifactId>hadoop-minicluster</artifactId> <version>${hadoop-23.version}</version> </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> </dependencies> </dependencyManagement> </profile> Modified: hive/branches/spark/ql/pom.xml URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/pom.xml?rev=1609166&r1=1609165&r2=1609166&view=diff ============================================================================== --- hive/branches/spark/ql/pom.xml (original) +++ hive/branches/spark/ql/pom.xml Wed Jul 9 14:16:49 2014 @@ -327,6 +327,13 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + <optional>true</optional> + </dependency> </dependencies> <profiles> @@ -533,7 +540,7 @@ <include>org.iq80.snappy:snappy</include> <include>org.codehaus.jackson:jackson-core-asl</include> <include>org.codehaus.jackson:jackson-mapper-asl</include> - <include>com.google.guava:guava</include> + <!--include>com.google.guava:guava</include--> </includes> </artifactSet> <relocations> Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1609166&r1=1609165&r2=1609166&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Context.java Wed Jul 9 14:16:49 2014 @@ -532,7 +532,12 @@ public class Context { * Today this translates into running hadoop jobs locally */ public boolean isLocalOnlyExecutionMode() { - return ShimLoader.getHadoopShims().isLocalMode(conf); + // TODO: hack to allow spark to run in a cluster mode. Without this, depending on + // user's local hadoop settings, true may be returned, which causes plan to be + // stored in local path. This will be addressed in subsquent patches, where false is + // returned for spark always. + return false; +// return ShimLoader.getHadoopShims().isLocalMode(conf); } public List<HiveLock> getHiveLocks() { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1609166&r1=1609165&r2=1609166&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Wed Jul 9 14:16:49 2014 @@ -1209,8 +1209,9 @@ public class Driver implements CommandPr } } - int jobs = Utilities.getMRTasks(plan.getRootTasks()).size() - + Utilities.getTezTasks(plan.getRootTasks()).size(); + int jobs = Utilities.getMRTasks(plan.getRootTasks()).size() + + Utilities.getTezTasks(plan.getRootTasks()).size() + + Utilities.getSparkTasks(plan.getRootTasks()).size(); if (jobs > 0) { console.printInfo("Query ID = " + plan.getQueryId()); console.printInfo("Total jobs = " + jobs); Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1609166&r1=1609165&r2=1609166&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Jul 9 14:16:49 2014 @@ -110,6 +110,7 @@ import org.apache.hadoop.hive.ql.exec.mr import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; @@ -305,15 +306,18 @@ public final class Utilities { InputStream in = null; try { path = getPlanPath(conf, name); + LOG.info("PLAN PATH = " + path); assert path != null; if (!gWorkMap.containsKey(path)) { Path localPath; if (ShimLoader.getHadoopShims().isLocalMode(conf)) { localPath = path; } else { + LOG.info("***************non-local mode***************"); localPath = new Path(name); } - + localPath = path; + LOG.info("local path = " + localPath); if (HiveConf.getBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN)) { LOG.debug("Loading plan from string: "+path.toUri().getPath()); String planString = conf.get(path.toUri().getPath()); @@ -325,7 +329,9 @@ public final class Utilities { in = new ByteArrayInputStream(planBytes); in = new InflaterInputStream(in); } else { - in = new FileInputStream(localPath.toUri().getPath()); + LOG.info("Open file to read in plan: " + localPath); +// in = new FileInputStream(localPath.toUri().getPath()); + in = localPath.getFileSystem(conf).open(localPath); } if(MAP_PLAN_NAME.equals(name)){ @@ -357,6 +363,7 @@ public final class Utilities { return gWork; } catch (FileNotFoundException fnf) { // happens. e.g.: no reduce work. + LOG.info("File not found: " + fnf.getMessage()); LOG.info("No plan file found: "+path); return null; } catch (Exception e) { @@ -2367,6 +2374,26 @@ public final class Utilities { } } + public static List<SparkTask> getSparkTasks(List<Task<? extends Serializable>> tasks) { + List<SparkTask> sparkTasks = new ArrayList<SparkTask>(); + if (tasks != null) { + getSparkTasks(tasks, sparkTasks); + } + return sparkTasks; + } + + private static void getSparkTasks(List<Task<? extends Serializable>> tasks, List<SparkTask> sparkTasks) { + for (Task<? extends Serializable> task : tasks) { + if (task instanceof SparkTask && !sparkTasks.contains(task)) { + sparkTasks.add((SparkTask) task); + } + + if (task.getDependentTasks() != null) { + getSparkTasks(task.getDependentTasks(), sparkTasks); + } + } + } + public static List<ExecDriver> getMRTasks(List<Task<? extends Serializable>> tasks) { List<ExecDriver> mrTasks = new ArrayList<ExecDriver>(); if (tasks != null) { @@ -3408,7 +3435,7 @@ public final class Utilities { if (origUmask != null) { conf.set("fs.permissions.umask-mode", origUmask); } else { - conf.unset("fs.permissions.umask-mode"); + //conf.unset("fs.permissions.umask-mode"); } } Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1609166&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java Wed Jul 9 14:16:49 2014 @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import java.util.Iterator; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.spark.api.java.function.PairFlatMapFunction; + +import scala.Tuple2; + +public class HiveMapFunction implements PairFlatMapFunction<Iterator<Tuple2<BytesWritable, BytesWritable>>, +BytesWritable, BytesWritable> { + private static final long serialVersionUID = 1L; + + private transient ExecMapper mapper; + private transient SparkCollector collector; + private transient JobConf jobConf; + + private byte[] buffer; + + public HiveMapFunction(byte[] buffer) { + this.buffer = buffer; + } + + @Override + public Iterable<Tuple2<BytesWritable, BytesWritable>> + call(Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception { + if (jobConf == null) { + jobConf = KryoSerializer.deserializeJobConf(this.buffer); + Path path = new Path(jobConf.getWorkingDirectory(), "plan.xml"); + FSDataInputStream in = path.getFileSystem(jobConf).open(path); + MapWork mw = Utilities.deserializePlan(in, MapWork.class, jobConf); + + Utilities.setMapWork(jobConf, mw); + mapper = new ExecMapper(); + mapper.configure(jobConf); + collector = new SparkCollector(); + } + + collector.clear(); + while(it.hasNext() && !ExecMapper.getDone()) { + Tuple2<BytesWritable, BytesWritable> input = it.next(); + mapper.map(input._1(), input._2(), collector, Reporter.NULL); + } + + mapper.close(); + return collector.getResult(); + } + +} Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1609166&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java Wed Jul 9 14:16:49 2014 @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.mr.ExecReducer; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.spark.api.java.function.PairFlatMapFunction; + +import scala.Tuple2; + +public class HiveReduceFunction implements PairFlatMapFunction<Iterator<Tuple2<BytesWritable,BytesWritable>>, +BytesWritable, BytesWritable> { + private static final long serialVersionUID = 1L; + + private transient ExecReducer reducer; + private transient SparkCollector collector; + private transient JobConf jobConf; + + private byte[] buffer; + + public HiveReduceFunction(byte[] buffer) { + this.buffer = buffer; + } + + @Override + public Iterable<Tuple2<BytesWritable, BytesWritable>> call(Iterator<Tuple2<BytesWritable,BytesWritable>> it) + throws Exception { + if (jobConf == null) { + jobConf = KryoSerializer.deserializeJobConf(this.buffer); + jobConf.set("mapred.reducer.class", ExecReducer.class.getName()); + + reducer = new ExecReducer(); + reducer.configure(jobConf); + collector = new SparkCollector(); + } + + collector.clear(); + Map<BytesWritable, List<BytesWritable>> clusteredRows = + new HashMap<BytesWritable, List<BytesWritable>>(); + while (it.hasNext()) { + Tuple2<BytesWritable, BytesWritable> input = it.next(); + BytesWritable key = input._1(); + BytesWritable value = input._2(); + System.out.println("reducer row: " + key + "/" + value); + // cluster the input according to key. + List<BytesWritable> valueList = clusteredRows.get(key); + if (valueList == null) { + valueList = new ArrayList<BytesWritable>(); + clusteredRows.put(key, valueList); + } + valueList.add(value); + } + + for (Map.Entry<BytesWritable, List<BytesWritable>> entry : clusteredRows.entrySet()) { + // pass on the clustered result to the reducer operator tree. + reducer.reduce(entry.getKey(), entry.getValue().iterator(), collector, Reporter.NULL); + } + + reducer.close(); + return collector.getResult(); + } + +} Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java?rev=1609166&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java Wed Jul 9 14:16:49 2014 @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.spark.api.java.function.VoidFunction; + +/** + * Implementation of a voidFunction that does nothing. + * + */ +public class HiveVoidFunction implements VoidFunction<Object> { + private static final long serialVersionUID = 1L; + + private static HiveVoidFunction instance = new HiveVoidFunction(); + + public static HiveVoidFunction getInstance() { + return instance; + } + + private HiveVoidFunction() { + } + + @Override + public void call(Object arg0) throws Exception { + } + +} Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java?rev=1609166&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java Wed Jul 9 14:16:49 2014 @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; +import org.apache.hadoop.mapred.JobConf; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +public class KryoSerializer { + private static final Kryo kryo = Utilities.runtimeSerializationKryo.get(); + + static { + kryo.register(ExecMapper.class); + } + + public static byte[] serialize(Object object) { + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + Output output = new Output(stream); + + kryo.writeObject(output, object); + + output.close(); // close() also calls flush() + return stream.toByteArray(); + } + + public static <T> T deserialize(byte[] buffer,Class<T> clazz) { + return kryo.readObject(new Input(new ByteArrayInputStream(buffer)), clazz); + } + + public static byte[] serializeJobConf(JobConf jobConf) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + jobConf.write(new DataOutputStream(out)); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + return null; + } finally { + try { + out.close(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + return out.toByteArray(); + + } + + public static JobConf deserializeJobConf(byte[] buffer) { + JobConf conf = new JobConf(); + try { + conf.readFields(new DataInputStream(new ByteArrayInputStream(buffer))); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + return null; + } + return conf; + } + +} Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1609166&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java Wed Jul 9 14:16:49 2014 @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.spark.HashPartitioner; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; + +public class SparkClient implements Serializable { + private static final long serialVersionUID = 1L; + + private static String masterUrl = "local"; + + private static String appName = "Hive-Spark"; + + private static String sparkHome = "/home/xzhang/apache/spark"; + + private static int reducerCount = 5; + + private static String execMem = "1g"; + private static String execJvmOpts = ""; + + static { + String envSparkHome = System.getenv("SPARK_HOME"); + if (envSparkHome != null) { + sparkHome = envSparkHome; + } + + String envMaster = System.getenv("MASTER"); + if (envMaster != null) { + masterUrl = envMaster; + } + + String reducers = System.getenv("REDUCERS"); + if (reducers != null) { + reducerCount = Integer.valueOf(reducers); + } + + String mem = System.getenv("spark_executor_memory"); + if (mem != null) { + execMem = mem; + } + + String jopts = System.getenv("spark_executor_extraJavaOptions"); + if (jopts != null) { + execJvmOpts = jopts; + } + + } + + private static SparkClient client = new SparkClient(); + + public static SparkClient getInstance() { + return client; + } + + private JavaSparkContext sc; + + private SparkClient() { + SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(masterUrl).setSparkHome(sparkHome); + sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); + sparkConf.set("spark.default.parallelism", "1"); + sparkConf.set("spark.executor.memory", execMem); + sparkConf.set("spark.executor.extraJavaOptions", execJvmOpts); + sc = new JavaSparkContext(sparkConf); + addJars(); + } + + public int execute(DriverContext driverContext, SparkWork sparkWork) { + int rc = 1; +// System.out.println("classpath=\n"+System.getProperty("java.class.path") + "\n"); + MapWork mapWork = sparkWork.getMapWork(); + ReduceWork redWork = sparkWork.getReduceWork(); + + Configuration hiveConf = driverContext.getCtx().getConf(); + // TODO: need to combine spark conf and hive conf + JobConf jobConf = new JobConf(hiveConf); + + Context ctx = driverContext.getCtx(); + Path emptyScratchDir; + try { + if (ctx == null) { + ctx = new Context(jobConf); + } + + emptyScratchDir = ctx.getMRTmpPath(); + FileSystem fs = emptyScratchDir.getFileSystem(jobConf); + fs.mkdirs(emptyScratchDir); + } catch (IOException e) { + e.printStackTrace(); + System.err.println("Error launching map-reduce job" + "\n" + + org.apache.hadoop.util.StringUtils.stringifyException(e)); + return 5; + } + + List<Path> inputPaths; + try { + inputPaths = Utilities.getInputPaths(jobConf, mapWork, emptyScratchDir, ctx); + } catch (Exception e2) { + e2.printStackTrace(); + return -1; + } + Utilities.setInputPaths(jobConf, inputPaths); + Utilities.setMapWork(jobConf, mapWork, emptyScratchDir, true); + if (redWork != null) + Utilities.setReduceWork(jobConf, redWork, emptyScratchDir, true); + + try { + Utilities.createTmpDirs(jobConf, mapWork); + Utilities.createTmpDirs(jobConf, redWork); + } catch (IOException e1) { + e1.printStackTrace(); + } + + try { + Path planPath = new Path(jobConf.getWorkingDirectory(), "plan.xml"); + System.out.println("Serializing plan to path: " + planPath); + OutputStream os2 = planPath.getFileSystem(jobConf).create(planPath); + Utilities.serializePlan(mapWork, os2, jobConf); + } catch (IOException e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + return 1; + } + + JavaPairRDD rdd = createRDD(sc, jobConf, mapWork); + byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); + HiveMapFunction mf = new HiveMapFunction(confBytes); + JavaPairRDD rdd2 = rdd.mapPartitionsToPair(mf); + if (redWork == null) { + rdd2.foreach(HiveVoidFunction.getInstance()); + if (mapWork.getAliasToWork() != null) { + for (Operator<? extends OperatorDesc> op : mapWork.getAliasToWork().values()) { + try { + op.jobClose(jobConf, true); + } catch (HiveException e) { + System.out.println("Calling jobClose() failed: " + e); + e.printStackTrace(); + } + } + } + } else { + JavaPairRDD rdd3 = rdd2.partitionBy(new HashPartitioner(reducerCount/*redWork.getNumReduceTasks()*/)); // Two partitions. + HiveReduceFunction rf = new HiveReduceFunction(confBytes); + JavaPairRDD rdd4 = rdd3.mapPartitionsToPair(rf); + rdd4.foreach(HiveVoidFunction.getInstance()); + try { + redWork.getReducer().jobClose(jobConf, true); + } catch (HiveException e) { + System.out.println("Calling jobClose() failed: " + e); + e.printStackTrace(); + } + } + + return 0; + } + + private JavaPairRDD createRDD(JavaSparkContext sc, JobConf jobConf, MapWork mapWork) { + Class ifClass = HiveInputFormat.class; + + // The mapper class is expected by the HiveInputFormat. + jobConf.set("mapred.mapper.class", ExecMapper.class.getName()); + return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); + } + + private void addJars() { + ClassLoader cl = ClassLoader.getSystemClassLoader(); + + System.out.println("-----------------------------------------------------"); + URL[] urls = ((URLClassLoader)cl).getURLs(); + + for(URL url: urls){ + java.io.File file = new java.io.File(url.getFile()); + if (file.exists() && file.isFile()) { + if (file.getName().contains("guava")) { + System.out.println("** skipping guava jar **: " + url.getFile()); + } else { + System.out.println("adding jar: " + url.getFile()); + sc.addJar(url.getFile()); + } + } + } + + System.out.println("---------------------------------------------- ------"); + } + +} Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java?rev=1609166&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java Wed Jul 9 14:16:49 2014 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.OutputCollector; + +import scala.Tuple2; + +public class SparkCollector implements OutputCollector<BytesWritable, BytesWritable>, Serializable { + private static final long serialVersionUID = 1L; + + private List<Tuple2<BytesWritable, BytesWritable>> result = new ArrayList<Tuple2<BytesWritable, BytesWritable>>(); + + @Override + public void collect(BytesWritable key, BytesWritable value) throws IOException { + result.add(new Tuple2<BytesWritable, BytesWritable>(key, value)); + } + + public void clear() { + result.clear(); + } + + public List<Tuple2<BytesWritable, BytesWritable>> getResult() { + return result; + } + +} Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1609166&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Wed Jul 9 14:16:49 2014 @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.api.StageType; + +public class SparkTask extends Task<SparkWork> { + private static final long serialVersionUID = 1L; + + @Override + public int execute(DriverContext driverContext) { + SparkClient client = SparkClient.getInstance(); + return client.execute(driverContext, getWork()); + } + + @Override + public boolean isMapRedTask() { + return true; + } + + @Override + public StageType getType() { + return StageType.MAPRED; + } + + @Override + public String getName() { + return "SPARK"; + } + +} Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=1609166&r1=1609165&r2=1609166&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java Wed Jul 9 14:16:49 2014 @@ -18,14 +18,20 @@ package org.apache.hadoop.hive.ql.io; +import java.io.Serializable; + import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.WritableComparator; /** * HiveKey is a simple wrapper on Text which allows us to set the hashCode * easily. hashCode is used for hadoop partitioner. + * + * TODO: spark require key to be serializable, even if it's considered serializable by + * Hadoop. For now, we let it implement Serializable. However, we expect that this is + * not needed soon. */ -public class HiveKey extends BytesWritable { +public class HiveKey extends BytesWritable implements Serializable { private static final int LENGTH_BYTES = 4; Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1609166&r1=1609165&r2=1609166&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Jul 9 14:16:49 2014 @@ -62,6 +62,7 @@ import org.apache.hadoop.hive.ql.exec.Un import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork; @@ -100,6 +101,7 @@ import org.apache.hadoop.hive.ql.plan.Pa import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; @@ -853,6 +855,16 @@ public final class GenMapRedUtils { ((MapWork)w).deriveExplainAttributes(); } } + } else if (task instanceof SparkTask) { + SparkWork sw = ((SparkTask)task).getWork(); + sw.getMapWork().deriveExplainAttributes(); + HashMap<String, Operator<? extends OperatorDesc>> opMap = sw + .getMapWork().getAliasToWork(); + if (opMap != null && !opMap.isEmpty()) { + for (Operator<? extends OperatorDesc> op : opMap.values()) { + setKeyAndValueDesc(sw.getReduceWork(), op); + } + } } if (task.getChildTasks() == null) { Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java?rev=1609166&r1=1609165&r2=1609166&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/MapReduceCompiler.java Wed Jul 9 14:16:49 2014 @@ -147,7 +147,9 @@ public class MapReduceCompiler extends T } // loop over all the operators recursively - private void breakOperatorTree(Operator<? extends OperatorDesc> topOp) { + // TODO: changed from private to protected for SparkCompiler to use. It will be changed back onece SparkCompiler + // stands alone. + protected void breakOperatorTree(Operator<? extends OperatorDesc> topOp) { if (topOp instanceof ReduceSinkOperator) { topOp.setChildOperators(null); } Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java?rev=1609166&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SparkCompiler.java Wed Jul 9 14:16:49 2014 @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; +import org.apache.hadoop.hive.ql.exec.spark.SparkTask; +import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; + +/** + * SparkCompiler translates the operator plan into SparkTask. + * TODO: currently extending MapReduceCompiler in order to make POC work. It will + * stand alone parallel to MapReduceCompiler. + */ +public class SparkCompiler extends MapReduceCompiler { + + protected final Log LOG = LogFactory.getLog(SparkCompiler.class); + + public SparkCompiler() { + } + + @Override + public void init(HiveConf conf, LogHelper console, Hive db) { + super.init(conf, console, db); + // Any Spark specific configuration + } + + @Override + protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs, + Set<WriteEntity> outputs) throws SemanticException { + } + + private static int counter = 0; + + @Override + protected void generateTaskTree(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx, + List<Task<MoveWork>> mvTask, Set<ReadEntity> inputs, Set<WriteEntity> outputs) + throws SemanticException { + super.generateTaskTree(rootTasks, pCtx, mvTask, inputs, outputs); + + MapRedTask mrTask = (MapRedTask) rootTasks.get(0); + MapWork mapWork = mrTask.getWork().getMapWork(); + ReduceWork redWork = mrTask.getWork().getReduceWork(); + SparkWork sparkWork = new SparkWork("first spark #" + counter++); + sparkWork.setMapWork(mapWork); + sparkWork.setReduceWork(redWork); + SparkTask task = new SparkTask(); + task.setWork(sparkWork); + task.setId(sparkWork.getName()); + rootTasks.clear(); + rootTasks.add(task); + + // finally make sure the file sink operators are set up right + breakTaskTree(task); + } + + private void breakTaskTree(Task<? extends Serializable> task) { + if (task instanceof SparkTask) { + SparkTask st = (SparkTask) task; + SparkWork sw = st.getWork(); + MapWork mw = sw.getMapWork(); + HashMap<String, Operator<? extends OperatorDesc>> opMap = mw.getAliasToWork(); + if (!opMap.isEmpty()) { + for (Operator<? extends OperatorDesc> op : opMap.values()) { + breakOperatorTree(op); + } + } + } + } + + @Override + protected void decideExecMode(List<Task<? extends Serializable>> rootTasks, Context ctx, + GlobalLimitCtx globalLimitCtx) + throws SemanticException { + // currently all Spark work is on the cluster + return; + } + + @Override + protected void optimizeTaskPlan(List<Task<? extends Serializable>> rootTasks, ParseContext pCtx, + Context ctx) throws SemanticException { + } + +} Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java?rev=1609166&r1=1609165&r2=1609166&view=diff ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java (original) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompilerFactory.java Wed Jul 9 14:16:49 2014 @@ -37,6 +37,8 @@ public class TaskCompilerFactory { public static TaskCompiler getCompiler(HiveConf conf, ParseContext parseContext) { if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { return new TezCompiler(); + } else if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + return new SparkCompiler(); } else { return new MapReduceCompiler(); } Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java?rev=1609166&view=auto ============================================================================== --- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java (added) +++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java Wed Jul 9 14:16:49 2014 @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.plan; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This class encapsulates all the work objects that can be executed + * in a single Spark job. Currently it's basically a tree with MapWork at the + * leaves and and ReduceWork in all other nodes. + */ +@SuppressWarnings("serial") +@Explain(displayName = "Spark") +public class SparkWork extends AbstractOperatorDesc { + private static transient final Log logger = LogFactory.getLog(SparkWork.class); + + private static int counter; + private final String name; + + private MapWork mapWork; + private ReduceWork redWork; + + public SparkWork(String name) { + this.name = name + ":" + (++counter); + } + + @Explain(displayName = "DagName") + public String getName() { + return name; + } + + public MapWork getMapWork() { + return mapWork; + } + + public void setMapWork(MapWork mapWork) { + this.mapWork = mapWork; + } + + public void setReduceWork(ReduceWork redWork) { + this.redWork = redWork; + } + + public ReduceWork getReduceWork() { + return redWork; + } + +}