Author: xuefu
Date: Thu Jun 11 13:16:34 2015
New Revision: 1684880
URL: http://svn.apache.org/r1684880
Log:
PIG-4585: Use newAPIHadoopRDD instead of newAPIHadoopFile (Mohit via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1684880&r1=1684879&r2=1684880&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
Thu Jun 11 13:16:34 2015
@@ -57,10 +57,12 @@ public class SparkUtil {
public static JobConf newJobConf(PigContext pigContext) throws IOException
{
JobConf jobConf = new JobConf(
ConfigurationUtil.toConfiguration(pigContext.getProperties()));
+ // Serialize the PigContext so it's available in Executor thread.
jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
- UDFContext.getUDFContext().serialize(jobConf);
+ // Serialize the thread local variable inside PigContext separately
jobConf.set("udf.import.list",
ObjectSerializer.serialize(PigContext.getPackageImportList()));
+ UDFContext.getUDFContext().serialize(jobConf);
return jobConf;
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1684880&r1=1684879&r2=1684880&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Thu Jun 11 13:16:34 2015
@@ -17,6 +17,8 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+import com.google.common.collect.Lists;
+
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
@@ -25,7 +27,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
@@ -40,25 +43,22 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
-import com.google.common.collect.Lists;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.SparkContext;
+
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.SparkContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+
/**
* Converter that loads data via POLoad and converts it to RRD<Tuple>.
Abuses
- * the interface a bit in that there is no inoput RRD to convert in this case.
+ * the interface a bit in that there is no input RRD to convert in this case.
* Instead input is the source path of the POLoad.
- *
*/
@SuppressWarnings({ "serial" })
public class LoadConverter implements RDDConverter<Tuple, Tuple, POLoad> {
+ private static Log LOG = LogFactory.getLog(LoadConverter.class);
- private static final ToTupleFunction TO_TUPLE_FUNCTION = new
ToTupleFunction();
- private static Log log = LogFactory.getLog(LoadConverter.class);
private PigContext pigContext;
private PhysicalPlan physicalPlan;
private SparkContext sparkContext;
@@ -71,24 +71,26 @@ public class LoadConverter implements RD
}
@Override
- public RDD<Tuple> convert(List<RDD<Tuple>> predecessorRdds, POLoad poLoad)
+ public RDD<Tuple> convert(List<RDD<Tuple>> predecessorRdds, POLoad op)
throws IOException {
- // if (predecessors.size()!=0) {
- // throw new
- // RuntimeException("Should not have predecessors for Load. Got :
"+predecessors);
- // }
-
- JobConf loadJobConf = SparkUtil.newJobConf(pigContext);
- configureLoader(physicalPlan, poLoad, loadJobConf);
-
- // don't know why but just doing this cast for now
- RDD<Tuple2<Text, Tuple>> hadoopRDD = sparkContext.newAPIHadoopFile(
- poLoad.getLFile().getFileName(), PigInputFormatSpark.class,
- Text.class, Tuple.class, loadJobConf);
+
+ // This configuration will be "broadcasted" by Spark, one to every
+ // node. Since we are changing the config here, the safe approach is
+ // to create a new conf for a new RDD.
+ JobConf jobConf = SparkUtil.newJobConf(pigContext);
+ configureLoader(physicalPlan, op, jobConf);
+
+ // Set the input directory for input formats that are backed by a
+ // filesystem. (Does not apply to HBase, for example).
+ jobConf.set("mapreduce.input.fileinputformat.inputdir",
+ op.getLFile().getFileName());
+
+ RDD<Tuple2<Text, Tuple>> hadoopRDD = sparkContext.newAPIHadoopRDD(
+ jobConf, PigInputFormatSpark.class, Text.class, Tuple.class);
registerUdfFiles();
// map to get just RDD<Tuple>
- return hadoopRDD.map(TO_TUPLE_FUNCTION,
+ return hadoopRDD.map(new ToTupleFunction(),
SparkUtil.getManifest(Tuple.class));
}
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1684880&r1=1684879&r2=1684880&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
Thu Jun 11 13:16:34 2015
@@ -17,6 +17,8 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+import com.google.common.collect.Lists;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -28,16 +30,13 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.pig.PigConfiguration;
import org.apache.pig.StoreFuncInterface;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
-import org.apache.pig.builtin.LOG;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -49,8 +48,6 @@ import org.apache.hadoop.mapreduce.lib.o
import scala.Tuple2;
-import com.google.common.collect.Lists;
-
/**
* Converter that takes a POStore and stores it's content.
*/
@@ -59,7 +56,6 @@ public class StoreConverter implements
RDDConverter<Tuple, Tuple2<Text, Tuple>, POStore> {
private static final Log LOG = LogFactory.getLog(StoreConverter.class);
- private static final FromTupleFunction FROM_TUPLE_FUNCTION = new
FromTupleFunction();
private PigContext pigContext;
@@ -69,46 +65,48 @@ public class StoreConverter implements
@Override
public RDD<Tuple2<Text, Tuple>> convert(List<RDD<Tuple>> predecessors,
- POStore physicalOperator) throws IOException {
- SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
+ POStore op) throws IOException {
+ SparkUtil.assertPredecessorSize(predecessors, op, 1);
RDD<Tuple> rdd = predecessors.get(0);
// convert back to KV pairs
- JavaRDD<Tuple2<Text, Tuple>> rddPairs =
rdd.toJavaRDD().map(FROM_TUPLE_FUNCTION);
+ JavaRDD<Tuple2<Text, Tuple>> rddPairs = rdd.toJavaRDD().map(
+ new FromTupleFunction());
PairRDDFunctions<Text, Tuple> pairRDDFunctions = new
PairRDDFunctions<Text, Tuple>(
rddPairs.rdd(), SparkUtil.getManifest(Text.class),
SparkUtil.getManifest(Tuple.class), null);
- JobConf storeJobConf = SparkUtil.newJobConf(pigContext);
- POStore poStore = configureStorer(storeJobConf, physicalOperator);
+ JobConf jobConf = SparkUtil.newJobConf(pigContext);
+ POStore poStore = configureStorer(jobConf, op);
- if ("true".equalsIgnoreCase(storeJobConf
+ if ("true".equalsIgnoreCase(jobConf
.get(PigConfiguration.PIG_OUTPUT_LAZY))) {
- Job storeJob = new Job(storeJobConf);
+ Job storeJob = new Job(jobConf);
LazyOutputFormat.setOutputFormatClass(storeJob,
PigOutputFormat.class);
- storeJobConf = (JobConf) storeJob.getConfiguration();
- storeJobConf.setOutputKeyClass(Text.class);
- storeJobConf.setOutputValueClass(Tuple.class);
+ jobConf = (JobConf) storeJob.getConfiguration();
+ jobConf.setOutputKeyClass(Text.class);
+ jobConf.setOutputValueClass(Tuple.class);
String fileName = poStore.getSFile().getFileName();
Path filePath = new Path(fileName);
- FileOutputFormat.setOutputPath(storeJobConf,filePath);
- pairRDDFunctions.saveAsNewAPIHadoopDataset(storeJobConf);
+ FileOutputFormat.setOutputPath(jobConf,filePath);
+ pairRDDFunctions.saveAsNewAPIHadoopDataset(jobConf);
} else {
pairRDDFunctions.saveAsNewAPIHadoopFile(poStore.getSFile()
.getFileName(), Text.class, Tuple.class,
- PigOutputFormat.class, storeJobConf);
+ PigOutputFormat.class, jobConf);
}
- RDD<Tuple2<Text, Tuple>> retRdd = rddPairs.rdd();
- if (LOG.isDebugEnabled())
- LOG.debug("RDD lineage: " + retRdd.toDebugString());
- return retRdd;
+
+ RDD<Tuple2<Text, Tuple>> retRdd = rddPairs.rdd();
+ if (LOG.isDebugEnabled())
+ LOG.debug("RDD lineage: " + retRdd.toDebugString());
+ return retRdd;
}
private static POStore configureStorer(JobConf jobConf,
- PhysicalOperator physicalOperator) throws IOException {
+ PhysicalOperator op) throws IOException {
ArrayList<POStore> storeLocations = Lists.newArrayList();
- POStore poStore = (POStore) physicalOperator;
+ POStore poStore = (POStore) op;
storeLocations.add(poStore);
StoreFuncInterface sFunc = poStore.getStoreFunc();
sFunc.setStoreLocation(poStore.getSFile().getFileName(),
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1684880&r1=1684879&r2=1684880&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
Thu Jun 11 13:16:34 2015
@@ -17,6 +17,8 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.running;
+import java.io.IOException;
+
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -28,9 +30,8 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStatusReporter;
-import java.io.IOException;
-
public class PigInputFormatSpark extends PigInputFormat {
+
@Override
public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,
@@ -45,9 +46,8 @@ public class PigInputFormatSpark extends
}
private void init() {
- PigStatusReporter pigStatusReporter =
PigStatusReporter.getInstance();
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
- pigHadoopLogger.setReporter(pigStatusReporter);
+ pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
PhysicalOperator.setPigLogger(pigHadoopLogger);
}
-}
+}
\ No newline at end of file