Author: xuefu
Date: Fri Feb 19 14:28:11 2016
New Revision: 1731248
URL: http://svn.apache.org/viewvc?rev=1731248&view=rev
Log:
PIG-4281: Fix TestFinish for Spark engine (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
pig/branches/spark/test/org/apache/pig/test/TestFinish.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1731248&r1=1731247&r2=1731248&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
Fri Feb 19 14:28:11 2016
@@ -30,8 +30,10 @@ import java.util.Set;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -82,6 +84,7 @@ public class JobGraphBuilder extends Spa
new PhyPlanSetter(sparkOp.physicalPlan).visit();
try {
sparkOperToRDD(sparkOp);
+ finishUDFs(sparkOp.physicalPlan);
} catch (InterruptedException e) {
throw new RuntimeException("fail to get the rdds of this spark
operator: ", e);
} catch (JobCreationException e){
@@ -89,6 +92,20 @@ public class JobGraphBuilder extends Spa
}
}
+ // Calling EvalFunc.finish()
+ private void finishUDFs(PhysicalPlan physicalPlan) throws VisitorException
{
+ UDFFinishVisitor finisher = new UDFFinishVisitor(physicalPlan,
+ new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(
+ physicalPlan));
+ try {
+ finisher.visit();
+ } catch (VisitorException e) {
+ int errCode = 2121;
+ String msg = "Error while calling finish method on UDFs.";
+ throw new VisitorException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
private void sparkOperToRDD(SparkOperator sparkOperator) throws
InterruptedException, VisitorException, JobCreationException {
List<SparkOperator> predecessors = sparkPlan
.getPredecessors(sparkOperator);
Modified: pig/branches/spark/test/org/apache/pig/test/TestFinish.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFinish.java?rev=1731248&r1=1731247&r2=1731248&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFinish.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFinish.java Fri Feb 19
14:28:11 2016
@@ -67,7 +67,12 @@ public class TestFinish {
@Override
public void finish() {
try {
- FileSystem fs =
FileSystem.get(PigMapReduce.sJobConfInternal.get());
+ FileSystem fs = null;
+ if (execType.equalsIgnoreCase("SPARK")) {
+ fs = FileSystem.get(cluster.getConfiguration());
+ } else {
+ fs = FileSystem.get(PigMapReduce.sJobConfInternal.get());
+ }
fs.create(new Path(expectedFileName));
} catch (IOException e) {
throw new RuntimeException("Unable to create file:" +
expectedFileName);
@@ -136,7 +141,7 @@ public class TestFinish {
String inputFileName = setUp(cluster.getExecType());
// this file will be created on the cluster if finish() is called
String expectedFileName = "testFinishInMapMR-finish.txt";
- pigServer.registerQuery("define MYUDF " +
MyEvalFunction.class.getName() + "('MAPREDUCE','"
+ pigServer.registerQuery("define MYUDF " +
MyEvalFunction.class.getName() + "('"+cluster.getExecType()+"','"
+ expectedFileName + "');");
pigServer.registerQuery("a = load '" +
Util.encodeEscape(inputFileName) + "' using "
+ PigStorage.class.getName() + "(':');");
@@ -155,7 +160,7 @@ public class TestFinish {
String inputFileName = setUp(cluster.getExecType());
// this file will be created on the cluster if finish() is called
String expectedFileName = "testFinishInReduceMR-finish.txt";
- pigServer.registerQuery("define MYUDF " +
MyEvalFunction.class.getName() + "('MAPREDUCE','"
+ pigServer.registerQuery("define MYUDF " +
MyEvalFunction.class.getName() + "('"+cluster.getExecType()+"','"
+ expectedFileName + "');");
pigServer.registerQuery("a = load '" +
Util.encodeEscape(inputFileName) + "' using "
+ PigStorage.class.getName() + "(':');");