Author: praveen
Date: Wed Dec 3 09:17:35 2014
New Revision: 1643073
URL: http://svn.apache.org/r1643073
Log:
PIG-4239: pig.output.lazy not works in spark mode (liyunzhang via praveen)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1643073&r1=1643072&r2=1643073&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
Wed Dec 3 09:17:35 2014
@@ -4,8 +4,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.StoreFuncInterface;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
@@ -19,6 +24,7 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.PairRDDFunctions;
import org.apache.spark.rdd.RDD;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import scala.Tuple2;
@@ -54,10 +60,23 @@ public class StoreConverter implements
JobConf storeJobConf = SparkUtil.newJobConf(pigContext);
POStore poStore = configureStorer(storeJobConf, physicalOperator);
- pairRDDFunctions.saveAsNewAPIHadoopFile(poStore.getSFile()
- .getFileName(), Text.class, Tuple.class, PigOutputFormat.class,
- storeJobConf);
-
+ if ("true".equalsIgnoreCase(storeJobConf
+ .get(PigConfiguration.PIG_OUTPUT_LAZY))) {
+ Job storeJob = new Job(storeJobConf);
+ LazyOutputFormat.setOutputFormatClass(storeJob,
+ PigOutputFormat.class);
+ storeJobConf = (JobConf) storeJob.getConfiguration();
+ storeJobConf.setOutputKeyClass(Text.class);
+ storeJobConf.setOutputValueClass(Tuple.class);
+ String fileName = poStore.getSFile().getFileName();
+ Path filePath = new Path(fileName);
+ FileOutputFormat.setOutputPath(storeJobConf,filePath);
+ pairRDDFunctions.saveAsNewAPIHadoopDataset(storeJobConf);
+ } else {
+ pairRDDFunctions.saveAsNewAPIHadoopFile(poStore.getSFile()
+ .getFileName(), Text.class, Tuple.class,
+ PigOutputFormat.class, storeJobConf);
+ }
return rddPairs.rdd();
}