Author: xuefu
Date: Tue Dec 22 20:16:37 2015
New Revision: 1721458

URL: http://svn.apache.org/viewvc?rev=1721458&view=rev
Log:
PIG-4765: Enable TestPoissonSampleLoader in spark mode (Liyun via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1721458&r1=1721457&r2=1721458&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
 Tue Dec 22 20:16:37 2015
@@ -19,12 +19,14 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.UDFContext;
@@ -36,10 +38,15 @@ public class PigInputFormatSpark extends
        public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
                        TaskAttemptContext context) throws IOException,
                        InterruptedException {
-               init();
-               resetUDFContext();
-               return super.createRecordReader(split, context);
-       }
+        init();
+        resetUDFContext();
+        RecordReader recordReader = super.createRecordReader(split, context);
+        //PigSplit#conf is the default hadoop configuration, we need get the 
configuration
+        //from context.getConfigration() to retrieve pig properties
+        PigSplit pigSplit = (PigSplit) split;
+        pigSplit.setConf(context.getConfiguration());
+        return recordReader;
+    }
 
        private void resetUDFContext() {
                UDFContext.getUDFContext().reset();


Reply via email to