Author: xuefu
Date: Thu Jun 11 13:16:34 2015
New Revision: 1684880

URL: http://svn.apache.org/r1684880
Log:
PIG-4585: Use newAPIHadoopRDD instead of newAPIHadoopFile (Mohit via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1684880&r1=1684879&r2=1684880&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 Thu Jun 11 13:16:34 2015
@@ -57,10 +57,12 @@ public class SparkUtil {
     public static JobConf newJobConf(PigContext pigContext) throws IOException 
{
         JobConf jobConf = new JobConf(
                 ConfigurationUtil.toConfiguration(pigContext.getProperties()));
+        // Serialize the PigContext so it's available in Executor thread.
         jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
-        UDFContext.getUDFContext().serialize(jobConf);
+        // Serialize the thread local variable inside PigContext separately
         jobConf.set("udf.import.list",
                 ObjectSerializer.serialize(PigContext.getPackageImportList()));
+        UDFContext.getUDFContext().serialize(jobConf);
         return jobConf;
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1684880&r1=1684879&r2=1684880&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 Thu Jun 11 13:16:34 2015
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
+import com.google.common.collect.Lists;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
@@ -25,7 +27,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
@@ -40,25 +43,22 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.ObjectSerializer;
-import com.google.common.collect.Lists;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.SparkContext;
+
 import scala.Function1;
 import scala.Tuple2;
 import scala.runtime.AbstractFunction1;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.SparkContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+
 /**
  * Converter that loads data via POLoad and converts it to RRD<Tuple>. 
Abuses
- * the interface a bit in that there is no inoput RRD to convert in this case.
+ * the interface a bit in that there is no input RRD to convert in this case.
  * Instead input is the source path of the POLoad.
- *
  */
 @SuppressWarnings({ "serial" })
 public class LoadConverter implements RDDConverter<Tuple, Tuple, POLoad> {
+    private static Log LOG = LogFactory.getLog(LoadConverter.class);
 
-    private static final ToTupleFunction TO_TUPLE_FUNCTION = new 
ToTupleFunction();
-    private static Log log = LogFactory.getLog(LoadConverter.class);
     private PigContext pigContext;
     private PhysicalPlan physicalPlan;
     private SparkContext sparkContext;
@@ -71,24 +71,26 @@ public class LoadConverter implements RD
     }
 
     @Override
-    public RDD<Tuple> convert(List<RDD<Tuple>> predecessorRdds, POLoad poLoad)
+    public RDD<Tuple> convert(List<RDD<Tuple>> predecessorRdds, POLoad op)
             throws IOException {
-        // if (predecessors.size()!=0) {
-        // throw new
-        // RuntimeException("Should not have predecessors for Load. Got : 
"+predecessors);
-        // }
-
-        JobConf loadJobConf = SparkUtil.newJobConf(pigContext);
-        configureLoader(physicalPlan, poLoad, loadJobConf);
-
-        // don't know why but just doing this cast for now
-        RDD<Tuple2<Text, Tuple>> hadoopRDD = sparkContext.newAPIHadoopFile(
-                poLoad.getLFile().getFileName(), PigInputFormatSpark.class,
-                Text.class, Tuple.class, loadJobConf);
+
+        // This configuration will be "broadcasted" by Spark, one to every
+        // node. Since we are changing the config here, the safe approach is
+        // to create a new conf for a new RDD.
+        JobConf jobConf = SparkUtil.newJobConf(pigContext);
+        configureLoader(physicalPlan, op, jobConf);
+
+        // Set the input directory for input formats that are backed by a
+        // filesystem. (Does not apply to HBase, for example).
+        jobConf.set("mapreduce.input.fileinputformat.inputdir",
+                op.getLFile().getFileName());
+
+        RDD<Tuple2<Text, Tuple>> hadoopRDD = sparkContext.newAPIHadoopRDD(
+                jobConf, PigInputFormatSpark.class, Text.class, Tuple.class);
 
         registerUdfFiles();
         // map to get just RDD<Tuple>
-        return hadoopRDD.map(TO_TUPLE_FUNCTION,
+        return hadoopRDD.map(new ToTupleFunction(),
                 SparkUtil.getManifest(Tuple.class));
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1684880&r1=1684879&r2=1684880&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
 Thu Jun 11 13:16:34 2015
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.converter;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -28,16 +30,13 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.StoreFuncInterface;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
-import org.apache.pig.builtin.LOG;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -49,8 +48,6 @@ import org.apache.hadoop.mapreduce.lib.o
 
 import scala.Tuple2;
 
-import com.google.common.collect.Lists;
-
 /**
  * Converter that takes a POStore and stores it's content.
  */
@@ -59,7 +56,6 @@ public class StoreConverter implements
         RDDConverter<Tuple, Tuple2<Text, Tuple>, POStore> {
 
   private static final Log LOG = LogFactory.getLog(StoreConverter.class);
-  private static final FromTupleFunction FROM_TUPLE_FUNCTION = new 
FromTupleFunction();
 
     private PigContext pigContext;
 
@@ -69,46 +65,48 @@ public class StoreConverter implements
 
     @Override
     public RDD<Tuple2<Text, Tuple>> convert(List<RDD<Tuple>> predecessors,
-            POStore physicalOperator) throws IOException {
-        SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
+            POStore op) throws IOException {
+        SparkUtil.assertPredecessorSize(predecessors, op, 1);
         RDD<Tuple> rdd = predecessors.get(0);
         // convert back to KV pairs
-        JavaRDD<Tuple2<Text, Tuple>> rddPairs = 
rdd.toJavaRDD().map(FROM_TUPLE_FUNCTION);
+        JavaRDD<Tuple2<Text, Tuple>> rddPairs = rdd.toJavaRDD().map(
+                new FromTupleFunction());
 
         PairRDDFunctions<Text, Tuple> pairRDDFunctions = new 
PairRDDFunctions<Text, Tuple>(
                 rddPairs.rdd(), SparkUtil.getManifest(Text.class),
                 SparkUtil.getManifest(Tuple.class), null);
 
-        JobConf storeJobConf = SparkUtil.newJobConf(pigContext);
-        POStore poStore = configureStorer(storeJobConf, physicalOperator);
+        JobConf jobConf = SparkUtil.newJobConf(pigContext);
+        POStore poStore = configureStorer(jobConf, op);
 
-        if ("true".equalsIgnoreCase(storeJobConf
+        if ("true".equalsIgnoreCase(jobConf
                 .get(PigConfiguration.PIG_OUTPUT_LAZY))) {
-            Job storeJob = new Job(storeJobConf);
+            Job storeJob = new Job(jobConf);
             LazyOutputFormat.setOutputFormatClass(storeJob,
                     PigOutputFormat.class);
-            storeJobConf = (JobConf) storeJob.getConfiguration();
-            storeJobConf.setOutputKeyClass(Text.class);
-            storeJobConf.setOutputValueClass(Tuple.class);
+            jobConf = (JobConf) storeJob.getConfiguration();
+            jobConf.setOutputKeyClass(Text.class);
+            jobConf.setOutputValueClass(Tuple.class);
             String fileName = poStore.getSFile().getFileName();
             Path filePath = new Path(fileName);
-            FileOutputFormat.setOutputPath(storeJobConf,filePath);
-            pairRDDFunctions.saveAsNewAPIHadoopDataset(storeJobConf);
+            FileOutputFormat.setOutputPath(jobConf,filePath);
+            pairRDDFunctions.saveAsNewAPIHadoopDataset(jobConf);
         } else {
             pairRDDFunctions.saveAsNewAPIHadoopFile(poStore.getSFile()
                     .getFileName(), Text.class, Tuple.class,
-                    PigOutputFormat.class, storeJobConf);
+                    PigOutputFormat.class, jobConf);
         }
-      RDD<Tuple2<Text, Tuple>> retRdd = rddPairs.rdd();
-      if (LOG.isDebugEnabled())
-          LOG.debug("RDD lineage: " + retRdd.toDebugString());
-      return retRdd;
+
+        RDD<Tuple2<Text, Tuple>> retRdd = rddPairs.rdd();
+        if (LOG.isDebugEnabled())
+            LOG.debug("RDD lineage: " + retRdd.toDebugString());
+        return retRdd;
     }
 
     private static POStore configureStorer(JobConf jobConf,
-            PhysicalOperator physicalOperator) throws IOException {
+            PhysicalOperator op) throws IOException {
         ArrayList<POStore> storeLocations = Lists.newArrayList();
-        POStore poStore = (POStore) physicalOperator;
+        POStore poStore = (POStore) op;
         storeLocations.add(poStore);
         StoreFuncInterface sFunc = poStore.getStoreFunc();
         sFunc.setStoreLocation(poStore.getSFile().getFileName(),

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1684880&r1=1684879&r2=1684880&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
 Thu Jun 11 13:16:34 2015
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.spark.running;
 
+import java.io.IOException;
+
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -28,9 +30,8 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
-import java.io.IOException;
-
 public class PigInputFormatSpark extends PigInputFormat {
+
        @Override
        public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
                        TaskAttemptContext context) throws IOException,
@@ -45,9 +46,8 @@ public class PigInputFormatSpark extends
        }
 
        private void init() {
-               PigStatusReporter pigStatusReporter = 
PigStatusReporter.getInstance();
                PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
-               pigHadoopLogger.setReporter(pigStatusReporter);
+               pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
                PhysicalOperator.setPigLogger(pigHadoopLogger);
        }
-}
+}
\ No newline at end of file


Reply via email to