Author: xuefu
Date: Mon Sep 29 18:05:53 2014
New Revision: 1628239
URL: http://svn.apache.org/r1628239
Log:
HIVE-7627: FSStatsPublisher does fit into Spark multi-thread task mode[Spark
Branch] (Chengxiang via Xuefu)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1628239&r1=1628238&r2=1628239&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
Mon Sep 29 18:05:53 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.io.merg
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
@@ -47,6 +48,8 @@ public class HiveMapFunction implements
call(Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception {
if (jobConf == null) {
jobConf = KryoSerializer.deserializeJobConf(this.buffer);
+ // set mapred.task.partition in executor side.
+ jobConf.setInt("mapred.task.partition",
TaskContext.get().getPartitionId());
}
SparkRecordHandler mapRecordHandler;
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL:
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1628239&r1=1628238&r2=1628239&view=diff
==============================================================================
---
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
(original)
+++
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
Mon Sep 29 18:05:53 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.io.Hive
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
@@ -46,6 +47,8 @@ public class HiveReduceFunction implemen
call(Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> it) throws Exception
{
if (jobConf == null) {
jobConf = KryoSerializer.deserializeJobConf(this.buffer);
+ // set mapred.task.partition in executor side.
+ jobConf.setInt("mapred.task.partition",
TaskContext.get().getPartitionId());
}
SparkReduceRecordHandler reducerRecordhandler = new
SparkReduceRecordHandler();