Author: xuefu
Date: Tue Jul 29 21:31:40 2014
New Revision: 1614496
URL: http://svn.apache.org/r1614496
Log:
HIVE-7338: Create SparkPlanGenerator(missing new files in previous commit)
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java?rev=1614496&view=auto
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java
(added)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ChainedTran.java
Tue Jul 29 21:31:40 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.util.List;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class ChainedTran implements SparkTran {
+ private List<SparkTran> trans;
+
+ public ChainedTran(List<SparkTran> trans) {
+ this.trans = trans;
+ }
+
+ @Override
+ public JavaPairRDD<BytesWritable, BytesWritable> transform(
+ JavaPairRDD<BytesWritable, BytesWritable> input) {
+ JavaPairRDD<BytesWritable, BytesWritable> result= input;
+ for (SparkTran tran : trans) {
+ result = tran.transform(result);
+ }
+ return result;
+ }
+
+}
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java?rev=1614496&view=auto
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
(added)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
Tue Jul 29 21:31:40 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class MapTran implements SparkTran {
+ private HiveMapFunction mapFunc;
+
+ @Override
+ public JavaPairRDD<BytesWritable, BytesWritable> transform(
+ JavaPairRDD<BytesWritable, BytesWritable> input) {
+ return input.mapPartitionsToPair(mapFunc);
+ }
+
+ public void setMapFunction(HiveMapFunction mapFunc) {
+ this.mapFunc = mapFunc;
+ }
+
+}
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java?rev=1614496&view=auto
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
(added)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
Tue Jul 29 21:31:40 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class ReduceTran implements SparkTran {
+ private HiveReduceFunction reduceFunc;
+
+ @Override
+ public JavaPairRDD<BytesWritable, BytesWritable> transform(
+ JavaPairRDD<BytesWritable, BytesWritable> input) {
+ return input.mapPartitionsToPair(reduceFunc);
+ }
+
+ public void setReduceFunction(HiveReduceFunction redFunc) {
+ this.reduceFunc = redFunc;
+ }
+
+}
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java?rev=1614496&view=auto
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
(added)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
Tue Jul 29 21:31:40 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class ShuffleTran implements SparkTran {
+
+ @Override
+ public JavaPairRDD<BytesWritable, BytesWritable> transform(
+ JavaPairRDD<BytesWritable, BytesWritable> input) {
+ return input.partitionBy(new HashPartitioner(1));
+ }
+
+}
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1614496&view=auto
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
(added)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
Tue Jul 29 21:31:40 2014
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class SparkPlan {
+ private JavaPairRDD<BytesWritable, BytesWritable> input;
+ private SparkTran tran;
+
+ public void execute() {
+ JavaPairRDD<BytesWritable, BytesWritable> rdd =
tran.transform(input);
+ rdd.foreach(HiveVoidFunction.getInstance());
+ }
+
+ public SparkTran getTran() {
+ return tran;
+ }
+
+ public void setTran(SparkTran tran) {
+ this.tran = tran;
+ }
+
+ public JavaPairRDD<BytesWritable, BytesWritable> getInput() {
+ return input;
+ }
+
+ public void setInput(JavaPairRDD<BytesWritable, BytesWritable> input) {
+ this.input = input;
+ }
+}
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1614496&view=auto
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
(added)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
Tue Jul 29 21:31:40 2014
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class SparkPlanGenerator {
+ private JavaSparkContext sc;
+ private JobConf jobConf;
+ private Context context;
+ private Path scratchDir;
+
+ public SparkPlanGenerator(JavaSparkContext sc, Context context, JobConf
jobConf, Path scratchDir) {
+ this.sc = sc;
+ this.context = context;
+ this.jobConf = jobConf;
+ this.scratchDir = scratchDir;
+ }
+
+ public SparkPlan generate(SparkWork sparkWork) throws Exception {
+ SparkPlan plan = new SparkPlan();
+ List<SparkTran> trans = new ArrayList<SparkTran>();
+ Set<BaseWork> roots = sparkWork.getRoots();
+ assert(roots != null && roots.size() == 1);
+ BaseWork w = roots.iterator().next();
+ MapWork mapWork = (MapWork) w;
+ trans.add(generate(w));
+ while (sparkWork.getChildren(w).size() > 0) {
+ BaseWork child = sparkWork.getChildren(w).get(0);
+ SparkEdgeProperty edge = sparkWork.getEdgeProperty(w, child);
+ trans.add(generate(edge));
+ trans.add(generate(child));
+ w = child;
+ }
+ ChainedTran chainedTran = new ChainedTran(trans);
+ plan.setTran(chainedTran);
+ JavaPairRDD<BytesWritable, BytesWritable> input = generateRDD(mapWork);
+ plan.setInput(input);
+ return plan;
+ }
+
+ private JavaPairRDD<BytesWritable, BytesWritable> generateRDD(MapWork
mapWork) throws Exception {
+ List<Path> inputPaths = Utilities.getInputPaths(jobConf, mapWork,
scratchDir, context, false);
+ Utilities.setInputPaths(jobConf, inputPaths);
+ Utilities.setMapWork(jobConf, mapWork, scratchDir, true);
+ Class ifClass = HiveInputFormat.class;
+
+ // The mapper class is expected by the HiveInputFormat.
+ jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
+ return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class,
Writable.class);
+ }
+
+ private SparkTran generate(BaseWork bw) throws IOException {
+ if (bw instanceof MapWork) {
+ return generate((MapWork)bw);
+ } else if (bw instanceof ReduceWork) {
+ return generate((ReduceWork)bw);
+ } else {
+ throw new IllegalArgumentException("Only MapWork and ReduceWork are
expected");
+ }
+ }
+
+ private MapTran generate(MapWork mw) throws IOException {
+ MapTran result = new MapTran();
+ Utilities.setMapWork(jobConf, mw, scratchDir, true);
+ Utilities.createTmpDirs(jobConf, mw);
+ jobConf.set("mapred.mapper.class", ExecMapper.class.getName());
+ byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+ HiveMapFunction mapFunc = new HiveMapFunction(confBytes);
+ result.setMapFunction(mapFunc);
+ return result;
+ }
+
+ private ShuffleTran generate(SparkEdgeProperty edge) {
+ // TODO: based on edge type, create groupBy or sortBy transformations.
+ return new ShuffleTran();
+ }
+
+ private ReduceTran generate(ReduceWork rw) throws IOException {
+ ReduceTran result = new ReduceTran();
+ Utilities.setReduceWork(jobConf, rw, scratchDir, true);
+ Utilities.createTmpDirs(jobConf, rw);
+ byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
+ HiveReduceFunction mapFunc = new HiveReduceFunction(confBytes);
+ result.setReduceFunction(mapFunc);
+ return result;
+ }
+
+}
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java?rev=1614496&view=auto
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
(added)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
Tue Jul 29 21:31:40 2014
@@ -0,0 +1,9 @@
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public interface SparkTran {
+ JavaPairRDD<BytesWritable, BytesWritable> transform(
+ JavaPairRDD<BytesWritable, BytesWritable> input);
+}