Author: xuefu
Date: Wed Jul 30 19:22:25 2014
New Revision: 1614747
URL: http://svn.apache.org/r1614747
Log:
HIVE-7556: Fix code style, license header, tabs, etc. [Spark Branch]
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1614747&r1=1614746&r2=1614747&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
Wed Jul 30 19:22:25 2014
@@ -31,7 +31,7 @@ import scala.Tuple2;
public class HiveMapFunction implements
PairFlatMapFunction<Iterator<Tuple2<BytesWritable, BytesWritable>>,
BytesWritable, BytesWritable> {
private static final long serialVersionUID = 1L;
-
+
private transient ExecMapper mapper;
private transient SparkCollector collector;
private transient JobConf jobConf;
@@ -58,9 +58,9 @@ BytesWritable, BytesWritable> {
System.out.println("mapper input: " + input._1() + ", " + input._2());
mapper.map(input._1(), input._2(), collector, Reporter.NULL);
}
-
+
mapper.close();
return collector.getResult();
}
-
+
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1614747&r1=1614746&r2=1614747&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
Wed Jul 30 19:22:25 2014
@@ -35,7 +35,7 @@ import scala.Tuple2;
public class HiveReduceFunction implements
PairFlatMapFunction<Iterator<Tuple2<BytesWritable,BytesWritable>>,
BytesWritable, BytesWritable> {
private static final long serialVersionUID = 1L;
-
+
private transient ExecReducer reducer;
private transient SparkCollector collector;
private transient JobConf jobConf;
@@ -52,7 +52,7 @@ BytesWritable, BytesWritable> {
if (jobConf == null) {
jobConf = KryoSerializer.deserializeJobConf(this.buffer);
jobConf.set("mapred.reducer.class", ExecReducer.class.getName());
-
+
reducer = new ExecReducer();
reducer.configure(jobConf);
collector = new SparkCollector();
@@ -74,14 +74,14 @@ BytesWritable, BytesWritable> {
}
valueList.add(value);
}
-
+
for (Map.Entry<BytesWritable, List<BytesWritable>> entry :
clusteredRows.entrySet()) {
// pass on the clustered result to the reducer operator tree.
reducer.reduce(entry.getKey(), entry.getValue().iterator(), collector,
Reporter.NULL);
}
-
+
reducer.close();
return collector.getResult();
}
-
+
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java?rev=1614747&r1=1614746&r2=1614747&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveVoidFunction.java
Wed Jul 30 19:22:25 2014
@@ -29,13 +29,13 @@ import scala.Tuple2;
*/
public class HiveVoidFunction implements VoidFunction<Tuple2<BytesWritable,
BytesWritable>> {
private static final long serialVersionUID = 1L;
-
+
private static HiveVoidFunction instance = new HiveVoidFunction();
public static HiveVoidFunction getInstance() {
return instance;
}
-
+
private HiveVoidFunction() {
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java?rev=1614747&r1=1614746&r2=1614747&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/KryoSerializer.java
Wed Jul 30 19:22:25 2014
@@ -73,7 +73,7 @@ public class KryoSerializer {
return out.toByteArray();
}
-
+
public static JobConf deserializeJobConf(byte[] buffer) {
JobConf conf = new JobConf();
try {
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java?rev=1614747&r1=1614746&r2=1614747&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
Wed Jul 30 19:22:25 2014
@@ -22,16 +22,16 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.spark.api.java.JavaPairRDD;
public class MapTran implements SparkTran {
- private HiveMapFunction mapFunc;
+ private HiveMapFunction mapFunc;
- @Override
- public JavaPairRDD<BytesWritable, BytesWritable> transform(
- JavaPairRDD<BytesWritable, BytesWritable> input) {
- return input.mapPartitionsToPair(mapFunc);
- }
+ @Override
+ public JavaPairRDD<BytesWritable, BytesWritable> transform(
+ JavaPairRDD<BytesWritable, BytesWritable> input) {
+ return input.mapPartitionsToPair(mapFunc);
+ }
- public void setMapFunction(HiveMapFunction mapFunc) {
- this.mapFunc = mapFunc;
- }
+ public void setMapFunction(HiveMapFunction mapFunc) {
+ this.mapFunc = mapFunc;
+ }
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java?rev=1614747&r1=1614746&r2=1614747&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
Wed Jul 30 19:22:25 2014
@@ -23,15 +23,15 @@ import org.apache.spark.api.java.JavaPai
public class ReduceTran implements SparkTran {
private HiveReduceFunction reduceFunc;
-
- @Override
- public JavaPairRDD<BytesWritable, BytesWritable> transform(
- JavaPairRDD<BytesWritable, BytesWritable> input) {
- return input.mapPartitionsToPair(reduceFunc);
- }
- public void setReduceFunction(HiveReduceFunction redFunc) {
- this.reduceFunc = redFunc;
- }
+ @Override
+ public JavaPairRDD<BytesWritable, BytesWritable> transform(
+ JavaPairRDD<BytesWritable, BytesWritable> input) {
+ return input.mapPartitionsToPair(reduceFunc);
+ }
+
+ public void setReduceFunction(HiveReduceFunction redFunc) {
+ this.reduceFunc = redFunc;
+ }
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java?rev=1614747&r1=1614746&r2=1614747&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
Wed Jul 30 19:22:25 2014
@@ -24,10 +24,10 @@ import org.apache.spark.api.java.JavaPai
public class ShuffleTran implements SparkTran {
- @Override
- public JavaPairRDD<BytesWritable, BytesWritable> transform(
- JavaPairRDD<BytesWritable, BytesWritable> input) {
- return input.partitionBy(new HashPartitioner(1));
- }
+ @Override
+ public JavaPairRDD<BytesWritable, BytesWritable> transform(
+ JavaPairRDD<BytesWritable, BytesWritable> input) {
+ return input.partitionBy(new HashPartitioner(1));
+ }
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1614747&r1=1614746&r2=1614747&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
Wed Jul 30 19:22:25 2014
@@ -50,14 +50,14 @@ public class SparkClient implements Seri
private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark";
private static SparkClient client;
-
+
public static synchronized SparkClient getInstance(Configuration hiveConf) {
if (client == null) {
client = new SparkClient(hiveConf);
}
return client;
}
-
+
private JavaSparkContext sc;
private List<String> localJars = new ArrayList<String>();
@@ -89,7 +89,7 @@ public class SparkClient implements Seri
String value = properties.getProperty(propertyName);
sparkConf.set(propertyName, properties.getProperty(propertyName));
LOG.info(String.format("load spark configuration from %s (%s ->
%s).",
- SPARK_DEFAULT_CONF_FILE, propertyName, value));
+ SPARK_DEFAULT_CONF_FILE, propertyName, value));
}
}
}
@@ -114,7 +114,7 @@ public class SparkClient implements Seri
String value = entry.getValue();
sparkConf.set(propertyName, value);
LOG.info(String.format("load spark configuration from hive
configuration (%s -> %s).",
- propertyName, value));
+ propertyName, value));
}
}
@@ -144,7 +144,7 @@ public class SparkClient implements Seri
} catch (IOException e) {
e.printStackTrace();
System.err.println("Error launching map-reduce job" + "\n"
- + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
return 5;
}
@@ -166,7 +166,7 @@ public class SparkClient implements Seri
} catch (IOException e1) {
e1.printStackTrace();
}
-/*
+ /*
try {
Path planPath = new Path(jobConf.getWorkingDirectory(), "plan.xml");
System.out.println("Serializing plan to path: " + planPath);
@@ -177,8 +177,8 @@ public class SparkClient implements Seri
e1.printStackTrace();
return 1;
}
-*/
-/* JavaPairRDD rdd = createRDD(sc, jobConf, mapWork);
+ */
+ /* JavaPairRDD rdd = createRDD(sc, jobConf, mapWork);
byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
HiveMapFunction mf = new HiveMapFunction(confBytes);
JavaPairRDD rdd2 = rdd.mapPartitionsToPair(mf);
@@ -206,7 +206,7 @@ public class SparkClient implements Seri
e.printStackTrace();
}
}
-*/
+ */
SparkPlanGenerator gen = new SparkPlanGenerator(sc, ctx, jobConf,
emptyScratchDir);
SparkPlan plan;
try {
@@ -219,7 +219,7 @@ public class SparkClient implements Seri
plan.execute();
return 0;
}
-
+
private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) {
// add hive-exec jar
String hiveJar = conf.getJar();
@@ -257,7 +257,7 @@ public class SparkClient implements Seri
if (newTmpJars != null && newTmpJars.length > 0) {
for (String tmpJar : newTmpJars) {
if (StringUtils.isNotEmpty(tmpJar) && StringUtils.isNotBlank(tmpJar)
- && !localJars.contains(tmpJar)) {
+ && !localJars.contains(tmpJar)) {
localJars.add(tmpJar);
sc.addJar(tmpJar);
}
@@ -282,7 +282,7 @@ public class SparkClient implements Seri
private void addResources(String addedFiles, List<String> localCache) {
for (String addedFile : addedFiles.split(",")) {
if (StringUtils.isNotEmpty(addedFile) &&
StringUtils.isNotBlank(addedFile)
- && !localCache.contains(addedFile)) {
+ && !localCache.contains(addedFile)) {
localCache.add(addedFile);
sc.addFile(addedFile);
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java?rev=1614747&r1=1614746&r2=1614747&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkCollector.java
Wed Jul 30 19:22:25 2014
@@ -32,19 +32,19 @@ public class SparkCollector implements O
private static final long serialVersionUID = 1L;
private List<Tuple2<BytesWritable, BytesWritable>> result = new
ArrayList<Tuple2<BytesWritable, BytesWritable>>();
-
+
@Override
public void collect(BytesWritable key, BytesWritable value) throws
IOException {
result.add(new Tuple2<BytesWritable,
BytesWritable>(copyBytesWritable(key), copyBytesWritable(value)));
}
-
+
// TODO: Move this to a utility class.
public static BytesWritable copyBytesWritable(BytesWritable bw) {
BytesWritable copy = new BytesWritable();
copy.set(bw);
return copy;
}
-
+
public void clear() {
result.clear();
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java?rev=1614747&r1=1614746&r2=1614747&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java
Wed Jul 30 19:22:25 2014
@@ -22,27 +22,27 @@ import org.apache.hadoop.io.BytesWritabl
import org.apache.spark.api.java.JavaPairRDD;
public class SparkPlan {
- private JavaPairRDD<BytesWritable, BytesWritable> input;
- private SparkTran tran;
-
- public void execute() {
- JavaPairRDD<BytesWritable, BytesWritable> rdd =
tran.transform(input);
- rdd.foreach(HiveVoidFunction.getInstance());
- }
-
- public SparkTran getTran() {
- return tran;
- }
-
- public void setTran(SparkTran tran) {
- this.tran = tran;
- }
-
- public JavaPairRDD<BytesWritable, BytesWritable> getInput() {
- return input;
- }
-
- public void setInput(JavaPairRDD<BytesWritable, BytesWritable> input) {
- this.input = input;
- }
+ private JavaPairRDD<BytesWritable, BytesWritable> input;
+ private SparkTran tran;
+
+ public void execute() {
+ JavaPairRDD<BytesWritable, BytesWritable> rdd = tran.transform(input);
+ rdd.foreach(HiveVoidFunction.getInstance());
+ }
+
+ public SparkTran getTran() {
+ return tran;
+ }
+
+ public void setTran(SparkTran tran) {
+ this.tran = tran;
+ }
+
+ public JavaPairRDD<BytesWritable, BytesWritable> getInput() {
+ return input;
+ }
+
+ public void setInput(JavaPairRDD<BytesWritable, BytesWritable> input) {
+ this.input = input;
+ }
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1614747&r1=1614746&r2=1614747&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
Wed Jul 30 19:22:25 2014
@@ -46,7 +46,7 @@ public class SparkTask extends Task<Spar
}
return rc;
}
-
+
/**
* close will move the temp files into the right place for the fetch
* task. If the job has failed it will clean up the files.
@@ -64,7 +64,7 @@ public class SparkTask extends Task<Spar
if (rc == 0) {
rc = 3;
String mesg = "Job Commit failed with exception '"
- + Utilities.getNameMessage(e) + "'";
+ + Utilities.getNameMessage(e) + "'";
console.printError(mesg, "\n" + StringUtils.stringifyException(e));
}
}
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java?rev=1614747&r1=1614746&r2=1614747&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
Wed Jul 30 19:22:25 2014
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.hadoop.hive.ql.exec.spark;
import org.apache.hadoop.io.BytesWritable;
@@ -5,5 +23,5 @@ import org.apache.spark.api.java.JavaPai
public interface SparkTran {
JavaPairRDD<BytesWritable, BytesWritable> transform(
- JavaPairRDD<BytesWritable, BytesWritable> input);
+ JavaPairRDD<BytesWritable, BytesWritable> input);
}