Author: zly Date: Fri May 5 08:39:42 2017 New Revision: 1793981 URL: http://svn.apache.org/viewvc?rev=1793981&view=rev Log: PIG-5215:Merge changes from review board to spark branch(Liyun)
Modified: pig/branches/spark/ivy/libraries.properties pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java Modified: pig/branches/spark/ivy/libraries.properties URL: http://svn.apache.org/viewvc/pig/branches/spark/ivy/libraries.properties?rev=1793981&r1=1793980&r2=1793981&view=diff ============================================================================== --- pig/branches/spark/ivy/libraries.properties (original) +++ pig/branches/spark/ivy/libraries.properties Fri May 5 08:39:42 2017 @@ -17,7 +17,6 @@ accumulo15.version=1.5.0 apacheant.version=1.7.1 apacherat.version=0.8 -asm.version=3.2 automaton.version=1.11-8 avro.version=1.7.5 basjes-httpdlog-pigloader.version=2.4 Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1793981&r1=1793980&r2=1793981&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Fri May 5 08:39:42 2017 @@ -224,7 +224,7 @@ public class PigSplit extends InputSplit else locMap.put(loc, lenInMap + split.getLength()); } catch (InterruptedException e) { - throw new RuntimeException("InputSplit.getLength throws exception: ", e); + throw new IOException("InputSplit.getLength throws exception: ", e); } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1793981&r1=1793980&r2=1793981&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Fri May 5 08:39:42 2017 @@ -44,7 +44,12 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POBroadcastSpark; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.backend.hadoop.executionengine.spark.converter.FRJoinConverter; import org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1793981&r1=1793980&r2=1793981&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Fri May 5 08:39:42 2017 @@ -389,15 +389,20 @@ public class SparkLauncher extends Launc tmpFolder.deleteOnExit(); for (String file : cacheFiles.split(",")) { String fileName = extractFileName(file.trim()); - Path src = new Path(extractFileUrl(file.trim())); - File tmpFile = new File(tmpFolder, fileName); - Path tmpFilePath = new Path(tmpFile.getAbsolutePath()); - FileSystem fs = tmpFilePath.getFileSystem(jobConf); - fs.copyToLocalFile(src, tmpFilePath); - tmpFile.deleteOnExit(); - LOG.info(String.format("CacheFile:%s", fileName)); - addResourceToSparkJobWorkingDirectory(tmpFile, fileName, - ResourceType.FILE); + if( fileName != null) { + String fileUrl = extractFileUrl(file.trim()); + if( fileUrl != null) { + Path src = new Path(fileUrl); + File tmpFile = new File(tmpFolder, fileName); + Path tmpFilePath = new Path(tmpFile.getAbsolutePath()); + FileSystem fs = tmpFilePath.getFileSystem(jobConf); + fs.copyToLocalFile(src, tmpFilePath); + tmpFile.deleteOnExit(); + LOG.info(String.format("CacheFile:%s", fileName)); + addResourceToSparkJobWorkingDirectory(tmpFile, fileName, + ResourceType.FILE); + } + } } } } @@ -484,26 +489,14 @@ public class SparkLauncher extends Launc String[] tmpAry = cacheFileUrl.split("#"); String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[1] : null; - if (fileName == null) { - throw new RuntimeException("cache file is invalid format, file:" - + cacheFileUrl); - } else { - LOG.debug("Cache file name is valid:" + cacheFileUrl); - return fileName; - } + return fileName; } private String extractFileUrl(String cacheFileUrl) { String[] tmpAry = cacheFileUrl.split("#"); String fileName = tmpAry != null && tmpAry.length == 2 ? tmpAry[0] : null; - if (fileName == null) { - throw new RuntimeException("cache file is invalid format, file:" - + cacheFileUrl); - } else { - LOG.debug("Cache file name is valid:" + cacheFileUrl); - return fileName; - } + return fileName; } public SparkOperPlan compile(PhysicalPlan physicalPlan, Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1793981&r1=1793980&r2=1793981&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Fri May 5 08:39:42 2017 @@ -138,17 +138,12 @@ public class LoadConverter implements RD SparkUtil.getManifest(Tuple.class)); } - private void registerUdfFiles() { + private void registerUdfFiles() throws MalformedURLException{ Map<String, File> scriptFiles = pigContext.getScriptFiles(); for (Map.Entry<String, File> scriptFile : scriptFiles.entrySet()) { - try { - File script = scriptFile.getValue(); - if (script.exists()) { - sparkContext.addFile(script.toURI().toURL().toExternalForm()); - } - } catch (MalformedURLException e) { - String msg = "Problem while registering UDF jars and files in LoadConverter."; - throw new RuntimeException(msg, e); + File script = scriptFile.getValue(); + if (script.exists()) { + sparkContext.addFile(script.toURI().toURL().toExternalForm()); } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java?rev=1793981&r1=1793980&r2=1793981&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java Fri May 5 08:39:42 2017 @@ -32,8 +32,6 @@ import org.apache.pig.impl.plan.VisitorE public class POPoissonSampleSpark extends POPoissonSample { private static final Log LOG = LogFactory.getLog(POPoissonSampleSpark.class); - //TODO verify can be removed? - //private static final long serialVersionUID = 1L; // Only for Spark private transient boolean endOfInput = false; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java?rev=1793981&r1=1793980&r2=1793981&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java Fri May 5 08:39:42 2017 @@ -84,12 +84,12 @@ public class SecondaryKeyOptimizerSpark try { mapPlan = getMapPlan(sparkOperator.physicalPlan, currentLR); } catch (PlanException e) { - throw new RuntimeException(e); + throw new VisitorException(e); } try { reducePlan = getReducePlan(sparkOperator.physicalPlan, currentLR); } catch (PlanException e) { - throw new RuntimeException(e); + throw new VisitorException(e); } // Current code does not enable secondarykey optimization when join case is encounted Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1793981&r1=1793980&r2=1793981&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java Fri May 5 08:39:42 2017 @@ -1352,11 +1352,7 @@ public class SparkCompiler extends PhyPl // if transform plans are not specified, project the columns of sorting keys if (transformPlans == null) { Pair<POProject, Byte>[] sortProjs = null; - try { - sortProjs = getSortCols(sort.getSortPlans()); - } catch (Exception e) { - throw new RuntimeException(e); - } + sortProjs = getSortCols(sort.getSortPlans()); // Set up the projections of the key columns if (sortProjs == null) { PhysicalPlan ep = new PhysicalPlan(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java?rev=1793981&r1=1793980&r2=1793981&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java Fri May 5 08:39:42 2017 @@ -28,19 +28,19 @@ import org.apache.pig.impl.plan.VisitorE */ public class SparkOperPlan extends OperatorPlan<SparkOperator> { - @Override - public String toString() { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream ps = new PrintStream(baos); - SparkPrinter printer = new SparkPrinter(ps, this); - printer.setVerbose(true); - try { - printer.visit(); - } catch (VisitorException e) { - // TODO Auto-generated catch block - throw new RuntimeException( - "Unable to get String representation of plan:" + e, e); - } - return baos.toString(); - } + @Override + public String toString() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream ps = new PrintStream(baos); + SparkPrinter printer = new SparkPrinter(ps, this); + printer.setVerbose(true); + try { + printer.visit(); + } catch (VisitorException e) { + // TODO Auto-generated catch block + throw new RuntimeException( + "Unable to get String representation of plan:" + e, e); + } + return baos.toString(); + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java?rev=1793981&r1=1793980&r2=1793981&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java Fri May 5 08:39:42 2017 @@ -107,7 +107,7 @@ public class SparkPOPackageAnnotator ext if (pkg.getPkgr() instanceof LitePackager) { if (lrearrange.getIndex() != 0) { - throw new RuntimeException( + throw new VisitorException( "POLocalRearrange for POPackageLite cannot have index other than 0, but has index - " + lrearrange.getIndex()); }