Author: xuefu
Date: Wed Mar 9 04:39:24 2016
New Revision: 1734188
URL: http://svn.apache.org/viewvc?rev=1734188&view=rev
Log:
PIG-4827: Fix TestSample UT failure (Pallavi via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1734188&r1=1734187&r2=1734188&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Wed Mar 9 04:39:24 2016
@@ -46,6 +46,7 @@ import org.apache.pig.backend.BackendExc
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
@@ -192,7 +193,7 @@ public class SparkLauncher extends Launc
physicalPlan, sparkContext.sc()));
convertMap.put(POStore.class, new StoreConverter(pigContext));
convertMap.put(POForEach.class, new ForEachConverter(confBytes));
- convertMap.put(POFilter.class, new FilterConverter());
+ convertMap.put(POFilter.class, new FilterConverter(confBytes));
convertMap.put(POPackage.class, new PackageConverter(confBytes));
convertMap.put(POLocalRearrange.class, new LocalRearrangeConverter());
convertMap.put(POGlobalRearrangeSpark.class, new
GlobalRearrangeConverter());
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java?rev=1734188&r1=1734187&r2=1734188&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
Wed Mar 9 04:39:24 2016
@@ -17,17 +17,28 @@
*/
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+import java.util.UUID;
import scala.runtime.AbstractFunction1;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.PigConstants;
import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.spark.KryoSerializer;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.spark.rdd.RDD;
/**
@@ -36,12 +47,18 @@ import org.apache.spark.rdd.RDD;
@SuppressWarnings({ "serial" })
public class FilterConverter implements RDDConverter<Tuple, Tuple, POFilter> {
+ private byte[] confBytes;
+
+ public FilterConverter(byte[] confBytes) {
+ this.confBytes = confBytes;
+ }
+
@Override
public RDD<Tuple> convert(List<RDD<Tuple>> predecessors,
POFilter physicalOperator) {
SparkUtil.assertPredecessorSize(predecessors, physicalOperator, 1);
RDD<Tuple> rdd = predecessors.get(0);
- FilterFunction filterFunction = new FilterFunction(physicalOperator);
+ FilterFunction filterFunction = new FilterFunction(physicalOperator,
confBytes);
return rdd.filter(filterFunction);
}
@@ -49,13 +66,17 @@ public class FilterConverter implements
AbstractFunction1<Tuple, Object> implements Serializable {
private POFilter poFilter;
+ private byte[] confBytes;
+ private transient JobConf jobConf;
- private FilterFunction(POFilter poFilter) {
+ private FilterFunction(POFilter poFilter, byte[] confBytes) {
this.poFilter = poFilter;
+ this.confBytes = confBytes;
}
@Override
public Boolean apply(Tuple v1) {
+ initializeJobConf();
Result result;
try {
poFilter.setInputs(null);
@@ -80,5 +101,24 @@ public class FilterConverter implements
"Unexpected response code from filter: " + result);
}
}
+
+ void initializeJobConf() {
+ if (this.jobConf != null) {
+ return;
+ }
+ this.jobConf = KryoSerializer.deserializeJobConf(this.confBytes);
+ PigMapReduce.sJobConfInternal.set(jobConf);
+ try {
+ MapRedUtil.setupUDFContext(jobConf);
+ PigContext pc = (PigContext)
ObjectSerializer.deserialize(jobConf.get("pig.pigContext"));
+ SchemaTupleBackend.initialize(jobConf, pc);
+ // Although Job ID and task index are not really applicable
for spark,
+ // set them here to overcome PIG-4827
+ jobConf.set(MRConfiguration.JOB_ID,
UUID.randomUUID().toString());
+ jobConf.set(PigConstants.TASK_INDEX, "0");
+ } catch (IOException ioe) {
+ throw new RuntimeException("Problem while configuring
UDFContext from FilterConverter.", ioe);
+ }
+ }
}
}