Author: praveen
Date: Wed Dec  3 09:17:35 2014
New Revision: 1643073

URL: http://svn.apache.org/r1643073
Log:
PIG-4239: pig.output.lazy not works in spark mode (liyunzhang via praveen)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1643073&r1=1643072&r2=1643073&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
 Wed Dec  3 09:17:35 2014
@@ -4,8 +4,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.StoreFuncInterface;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
@@ -19,6 +24,7 @@ import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.rdd.PairRDDFunctions;
 import org.apache.spark.rdd.RDD;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 
 import scala.Tuple2;
 
@@ -54,10 +60,23 @@ public class StoreConverter implements
         JobConf storeJobConf = SparkUtil.newJobConf(pigContext);
         POStore poStore = configureStorer(storeJobConf, physicalOperator);
 
-        pairRDDFunctions.saveAsNewAPIHadoopFile(poStore.getSFile()
-                .getFileName(), Text.class, Tuple.class, PigOutputFormat.class,
-                storeJobConf);
-
+        if ("true".equalsIgnoreCase(storeJobConf
+                .get(PigConfiguration.PIG_OUTPUT_LAZY))) {
+            Job storeJob = new Job(storeJobConf);
+            LazyOutputFormat.setOutputFormatClass(storeJob,
+                    PigOutputFormat.class);
+            storeJobConf = (JobConf) storeJob.getConfiguration();
+            storeJobConf.setOutputKeyClass(Text.class);
+            storeJobConf.setOutputValueClass(Tuple.class);
+            String fileName = poStore.getSFile().getFileName();
+            Path filePath = new Path(fileName);
+            FileOutputFormat.setOutputPath(storeJobConf,filePath);
+            pairRDDFunctions.saveAsNewAPIHadoopDataset(storeJobConf);
+        } else {
+            pairRDDFunctions.saveAsNewAPIHadoopFile(poStore.getSFile()
+                    .getFileName(), Text.class, Tuple.class,
+                    PigOutputFormat.class, storeJobConf);
+        }
         return rddPairs.rdd();
     }
 


Reply via email to