Author: xuefu
Date: Mon Sep 29 18:05:53 2014
New Revision: 1628239

URL: http://svn.apache.org/r1628239
Log:
HIVE-7627: FSStatsPublisher does fit into Spark multi-thread task mode[Spark 
Branch] (Chengxiang via Xuefu)

Modified:
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
    
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1628239&r1=1628238&r2=1628239&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
 Mon Sep 29 18:05:53 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.io.merg
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 
 import scala.Tuple2;
@@ -47,6 +48,8 @@ public class HiveMapFunction implements 
   call(Iterator<Tuple2<BytesWritable, BytesWritable>> it) throws Exception {
     if (jobConf == null) {
       jobConf = KryoSerializer.deserializeJobConf(this.buffer);
+      // set mapred.task.partition in executor side.
+      jobConf.setInt("mapred.task.partition", 
TaskContext.get().getPartitionId());
     }
 
     SparkRecordHandler mapRecordHandler;

Modified: 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: 
http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1628239&r1=1628238&r2=1628239&view=diff
==============================================================================
--- 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
 (original)
+++ 
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
 Mon Sep 29 18:05:53 2014
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 
 import scala.Tuple2;
@@ -46,6 +47,8 @@ public class HiveReduceFunction implemen
   call(Iterator<Tuple2<HiveKey, Iterable<BytesWritable>>> it) throws Exception 
{
     if (jobConf == null) {
       jobConf = KryoSerializer.deserializeJobConf(this.buffer);
+      // set mapred.task.partition in executor side.
+      jobConf.setInt("mapred.task.partition", 
TaskContext.get().getPartitionId());
     }
 
     SparkReduceRecordHandler reducerRecordhandler = new 
SparkReduceRecordHandler();


Reply via email to