Author: xuefu
Date: Tue Dec 22 20:16:37 2015
New Revision: 1721458
URL: http://svn.apache.org/viewvc?rev=1721458&view=rev
Log:
PIG-4765: Enable TestPoissonSampleLoader in spark mode (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1721458&r1=1721457&r2=1721458&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
Tue Dec 22 20:16:37 2015
@@ -19,12 +19,14 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;
@@ -36,10 +38,15 @@ public class PigInputFormatSpark extends
public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
- init();
- resetUDFContext();
- return super.createRecordReader(split, context);
- }
+ init();
+ resetUDFContext();
+ RecordReader recordReader = super.createRecordReader(split, context);
+ //PigSplit#conf is the default hadoop configuration, we need get the
configuration
+ //from context.getConfigration() to retrieve pig properties
+ PigSplit pigSplit = (PigSplit) split;
+ pigSplit.setConf(context.getConfiguration());
+ return recordReader;
+ }
private void resetUDFContext() {
UDFContext.getUDFContext().reset();