Author: rohini
Date: Wed May 17 21:52:19 2017
New Revision: 1795466
URL: http://svn.apache.org/viewvc?rev=1795466&view=rev
Log:
PIG-5135: HDFS bytes read stats are always 0 in Spark mode (szita via rohini)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
pig/branches/spark/test/excluded-tests-spark
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1795466&r1=1795465&r2=1795466&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
Wed May 17 21:52:19 2017
@@ -61,14 +61,6 @@ public class PigInputFormat extends Inpu
public static final String PIG_INPUT_SIGNATURES = "pig.inpSignatures";
public static final String PIG_INPUT_LIMITS = "pig.inpLimits";
- /**
- * @deprecated Use {@link UDFContext} instead in the following way to get
- * the job's {@link Configuration}:
- * <pre>UdfContext.getUdfContext().getJobConf()</pre>
- */
- @Deprecated
- public static Configuration sJob;
-
/* (non-Javadoc)
* @see
org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache.hadoop.mapreduce.InputSplit,
org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@@ -78,43 +70,66 @@ public class PigInputFormat extends Inpu
org.apache.hadoop.mapreduce.InputSplit split,
TaskAttemptContext context) throws IOException,
InterruptedException {
- // We need to create a TaskAttemptContext based on the Configuration
which
- // was used in the getSplits() to produce the split supplied here. For
- // this, let's find out the input of the script which produced the
split
- // supplied here and then get the corresponding Configuration and setup
- // TaskAttemptContext based on it and then call the real InputFormat's
- // createRecordReader() method
-
- PigSplit pigSplit = (PigSplit)split;
- activeSplit = pigSplit;
- // XXX hadoop 20 new API integration: get around a hadoop 20 bug by
- // passing total # of splits to each split so it can be retrieved
- // here and set it to the configuration object. This number is needed
- // by PoissonSampleLoader to compute the number of samples
- int n = pigSplit.getTotalSplits();
- context.getConfiguration().setInt("pig.mapsplits.count", n);
- Configuration conf = context.getConfiguration();
- PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
- .deserialize(conf.get("udf.import.list")));
- MapRedUtil.setupUDFContext(conf);
- LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
- // Pass loader signature to LoadFunc and to InputFormat through
- // the conf
- passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf);
-
- // merge entries from split specific conf into the conf we got
- PigInputFormat.mergeSplitSpecificConf(loadFunc, pigSplit, conf);
-
- // for backward compatibility
- PigInputFormat.sJob = conf;
+ RecordReaderFactory factory = new RecordReaderFactory(split, context);
+ return factory.createRecordReader();
+ }
- InputFormat inputFormat = loadFunc.getInputFormat();
- List<Long> inpLimitLists =
- (ArrayList<Long>)ObjectSerializer.deserialize(
- conf.get(PIG_INPUT_LIMITS));
+ /**
+ * Helper class to create record reader
+ */
+ protected static class RecordReaderFactory {
+ protected InputFormat inputFormat;
+ protected PigSplit pigSplit;
+ protected LoadFunc loadFunc;
+ protected TaskAttemptContext context;
+ protected long limit;
+
+ public RecordReaderFactory(org.apache.hadoop.mapreduce.InputSplit
split,
+ TaskAttemptContext context) throws
IOException {
+
+ // We need to create a TaskAttemptContext based on the
Configuration which
+ // was used in the getSplits() to produce the split supplied here.
For
+ // this, let's find out the input of the script which produced the
split
+ // supplied here and then get the corresponding Configuration and
setup
+ // TaskAttemptContext based on it and then call the real
InputFormat's
+ // createRecordReader() method
+
+ PigSplit pigSplit = (PigSplit)split;
+ // XXX hadoop 20 new API integration: get around a hadoop 20 bug by
+ // passing total # of splits to each split so it can be retrieved
+ // here and set it to the configuration object. This number is
needed
+ // by PoissonSampleLoader to compute the number of samples
+ int n = pigSplit.getTotalSplits();
+ context.getConfiguration().setInt("pig.mapsplits.count", n);
+ Configuration conf = context.getConfiguration();
+ PigContext.setPackageImportList((ArrayList<String>)
ObjectSerializer
+ .deserialize(conf.get("udf.import.list")));
+ MapRedUtil.setupUDFContext(conf);
+ LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
+ // Pass loader signature to LoadFunc and to InputFormat through
+ // the conf
+ passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf);
+
+ // merge entries from split specific conf into the conf we got
+ PigInputFormat.mergeSplitSpecificConf(loadFunc, pigSplit, conf);
+
+ InputFormat inputFormat = loadFunc.getInputFormat();
+
+ List<Long> inpLimitLists =
+ (ArrayList<Long>)ObjectSerializer.deserialize(
+ conf.get(PIG_INPUT_LIMITS));
+
+ this.inputFormat = inputFormat;
+ this.pigSplit = pigSplit;
+ this.loadFunc = loadFunc;
+ this.context = context;
+ this.limit = inpLimitLists.get(pigSplit.getInputIndex());
+ }
- return new PigRecordReader(inputFormat, pigSplit, loadFunc, context,
inpLimitLists.get(pigSplit.getInputIndex()));
+ public org.apache.hadoop.mapreduce.RecordReader<Text, Tuple>
createRecordReader() throws IOException, InterruptedException {
+ return new PigRecordReader(inputFormat, pigSplit, loadFunc,
context, limit);
+ }
}
@@ -339,10 +354,4 @@ public class PigInputFormat extends Inpu
return pigSplit;
}
- public static PigSplit getActiveSplit() {
- return activeSplit;
- }
-
- private static PigSplit activeSplit;
-
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1795466&r1=1795465&r2=1795466&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
Wed May 17 21:52:19 2017
@@ -18,19 +18,25 @@
package org.apache.pig.backend.hadoop.executionengine.spark.running;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import
org.apache.pig.backend.hadoop.executionengine.spark.SparkPigRecordReader;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkPigSplit;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
@@ -42,14 +48,13 @@ import org.apache.pig.tools.pigstats.spa
public class PigInputFormatSpark extends PigInputFormat {
- @Override
- public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException,
- InterruptedException {
+ @Override
+ public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
TaskAttemptContext context) throws
+ IOException, InterruptedException {
resetUDFContext();
//PigSplit#conf is the default hadoop configuration, we need get the
configuration
//from context.getConfigration() to retrieve pig properties
- PigSplit pigSplit = (PigSplit) split;
+ PigSplit pigSplit = ((SparkPigSplit) split).getWrappedPigSplit();
Configuration conf = context.getConfiguration();
pigSplit.setConf(conf);
//Set current splitIndex in
PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX)
@@ -61,7 +66,42 @@ public class PigInputFormatSpark extends
// Here JobConf is first available in spark Executor thread, we
initialize PigContext,UDFContext and
// SchemaTupleBackend by reading properties from JobConf
initialize(conf);
- return super.createRecordReader(split, context);
+
+ SparkRecordReaderFactory sparkRecordReaderFactory = new
SparkRecordReaderFactory(pigSplit, context);
+ return sparkRecordReaderFactory.createRecordReader();
+ }
+
+ /**
+ * This is where we have to wrap PigSplits into SparkPigSplits
+ * @param jobcontext
+ * @return
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public List<InputSplit> getSplits(JobContext jobcontext) throws
IOException, InterruptedException {
+ List<InputSplit> sparkPigSplits = new ArrayList<>();
+ List<InputSplit> originalSplits = super.getSplits(jobcontext);
+
+ boolean isFileSplits = true;
+ for (InputSplit inputSplit : originalSplits) {
+ PigSplit split = (PigSplit)inputSplit;
+ if (!(split.getWrappedSplit() instanceof FileSplit)) {
+ isFileSplits = false;
+ break;
+ }
+ }
+
+ for (InputSplit inputSplit : originalSplits) {
+ PigSplit split = (PigSplit) inputSplit;
+ if (!isFileSplits) {
+ sparkPigSplits.add(new
SparkPigSplit.GenericSparkPigSplit(split));
+ } else {
+ sparkPigSplits.add(new SparkPigSplit.FileSparkPigSplit(split));
+ }
+ }
+
+ return sparkPigSplits;
}
private void initialize(Configuration jobConf) throws IOException {
@@ -78,4 +118,17 @@ public class PigInputFormatSpark extends
private void resetUDFContext() {
UDFContext.getUDFContext().reset();
}
+
+
+ static class SparkRecordReaderFactory extends
PigInputFormat.RecordReaderFactory {
+
+ public SparkRecordReaderFactory(InputSplit split, TaskAttemptContext
context) throws IOException {
+ super(split, context);
+ }
+
+ @Override
+ public RecordReader<Text, Tuple> createRecordReader() throws
IOException, InterruptedException {
+ return new SparkPigRecordReader(inputFormat, pigSplit, loadFunc,
context, limit);
+ }
+ }
}
\ No newline at end of file
Modified: pig/branches/spark/test/excluded-tests-spark
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/excluded-tests-spark?rev=1795466&r1=1795465&r2=1795466&view=diff
==============================================================================
--- pig/branches/spark/test/excluded-tests-spark (original)
+++ pig/branches/spark/test/excluded-tests-spark Wed May 17 21:52:19 2017
@@ -2,6 +2,3 @@
**/tez/*.java
**/TestNativeMapReduce.java
**/TestCounters.java
-
-#TODO: PIG-5135 fix for Spark mode
-**/TestOrcStoragePushdown.java