Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Feb 24 03:34:37 2017 @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.Method; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; @@ -60,7 +61,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; -import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.pig.ComparisonFunc; import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; @@ -71,7 +71,6 @@ import org.apache.pig.PigException; import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; -import org.apache.pig.backend.hadoop.PigJobControl; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.JobCreationException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner; @@ -90,6 +89,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataType; @@ -122,7 +122,6 @@ import org.apache.pig.impl.util.ObjectSe import org.apache.pig.impl.util.Pair; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; -import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; import org.apache.pig.tools.pigstats.mapreduce.MRScriptState; @@ -312,7 +311,7 @@ public class JobControlCompiler{ " should be a time in ms. default=" + defaultPigJobControlSleep, e); } - JobControl jobCtrl = new PigJobControl(grpName, timeToSleep); + JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep); try { List<MapReduceOper> roots = new LinkedList<MapReduceOper>(); @@ -385,7 +384,7 @@ public class JobControlCompiler{ ArrayList<Pair<String,Long>> counterPairs; try { - counters = MRJobStats.getCounters(job); + counters = HadoopShims.getCounters(job); String groupName = getGroupName(counters.getGroupNames()); // In case that the counter group was not find, we need to find @@ -703,8 +702,7 @@ public class JobControlCompiler{ // since this path would be invalid for the new job being created pigContext.getProperties().remove("mapreduce.job.credentials.binary"); - conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pigContext.getExecType().isLocal()); - conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pigContext.getLog4jProperties())); + conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext)); conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList())); // this is for unit tests since some don't create PigServer @@ -1673,6 +1671,14 @@ public class JobControlCompiler{ if (distCachePath != null) { log.info("Jar file " + url + " already in DistributedCache as " + distCachePath + ". Not copying to hdfs and adding again"); + // Path already in dist cache + if (!HadoopShims.isHadoopYARN()) { + // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth. + // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files + // But path may only be in 'mapred.cache.files' and not be in + // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there + DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf)); + } } else { // REGISTER always copies locally the jar file. see PigServer.registerJar() @@ -1958,9 +1964,20 @@ public class JobControlCompiler{ public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) { // the OutputFormat we report to Hadoop is always PigOutputFormat which - // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set + // can be wrapped with LazyOutputFormat provided if it is supported by + // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) { - LazyOutputFormat.setOutputFormatClass(job,PigOutputFormat.class); + try { + Class<?> clazz = PigContext + .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat"); + Method method = clazz.getMethod("setOutputFormatClass", + org.apache.hadoop.mapreduce.Job.class, Class.class); + method.invoke(null, job, PigOutputFormat.class); + } catch (Exception e) { + job.setOutputFormatClass(PigOutputFormat.class); + log.warn(PigConfiguration.PIG_OUTPUT_LAZY + + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used"); + } } else { job.setOutputFormatClass(PigOutputFormat.class); }
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Feb 24 03:34:37 2017 @@ -1116,9 +1116,7 @@ public class MRCompiler extends PhyPlanV try{ nonBlocking(op); phyToMROpMap.put(op, curMROp); - if (op.getPkgr().getPackageType() == PackageType.JOIN - || op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) { - // Bloom join is not implemented in mapreduce mode and falls back to regular join + if (op.getPkgr().getPackageType() == PackageType.JOIN) { curMROp.markRegularJoin(); } else if (op.getPkgr().getPackageType() == PackageType.GROUP) { if (op.getNumInps() == 1) { @@ -1280,7 +1278,7 @@ public class MRCompiler extends PhyPlanV List<InputSplit> splits = inf.getSplits(HadoopShims.cloneJobContext(job)); List<List<InputSplit>> results = MapRedUtil .getCombinePigSplits(splits, - fs.getDefaultBlockSize(path), + HadoopShims.getDefaultBlockSize(fs, path), conf); numFiles += results.size(); } else { @@ -2434,7 +2432,7 @@ public class MRCompiler extends PhyPlanV }else{ for(int i=0; i<transformPlans.size(); i++) { eps1.add(transformPlans.get(i)); - flat1.add(i == transformPlans.size() - 1 ? true : false); + flat1.add(true); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Feb 24 03:34:37 2017 @@ -19,9 +19,7 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.io.PrintStream; -import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Calendar; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -42,8 +40,7 @@ import org.apache.hadoop.mapred.JobClien import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapreduce.Cluster; -import org.apache.hadoop.mapreduce.TaskReport; +import org.apache.hadoop.mapred.TaskReport; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapreduce.TaskType; import org.apache.pig.PigConfiguration; @@ -68,7 +65,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.CompilationMessageCollector; @@ -82,18 +78,15 @@ import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.PigStatsUtil; -import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; import org.apache.pig.tools.pigstats.mapreduce.MRScriptState; -import org.python.google.common.collect.Lists; - /** * Main class that launches pig for Map Reduce * */ -public class MapReduceLauncher extends Launcher { +public class MapReduceLauncher extends Launcher{ public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; @@ -101,30 +94,14 @@ public class MapReduceLauncher extends L private boolean aggregateWarning = false; - public MapReduceLauncher() { - super(); - Utils.addShutdownHookWithPriority(new HangingJobKiller(), - PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY); - } - @Override public void kill() { try { - if (jc != null && jc.getRunningJobs().size() > 0) { - log.info("Received kill signal"); + log.debug("Receive kill signal"); + if (jc!=null) { for (Job job : jc.getRunningJobs()) { - org.apache.hadoop.mapreduce.Job mrJob = job.getJob(); - try { - if (mrJob != null) { - mrJob.killJob(); - } - } catch (Exception ir) { - throw new IOException(ir); - } + HadoopShims.killJob(job); log.info("Job " + job.getAssignedJobID() + " killed"); - String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - .format(Calendar.getInstance().getTime()); - System.err.println(timeStamp + " Job " + job.getAssignedJobID() + " killed"); } } } catch (Exception e) { @@ -324,7 +301,8 @@ public class MapReduceLauncher extends L // Now wait, till we are finished. while(!jc.allFinished()){ - jcThread.join(sleepTime); + try { jcThread.join(sleepTime); } + catch (InterruptedException e) {} List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>(); @@ -343,6 +321,11 @@ public class MapReduceLauncher extends L log.info("detailed locations: " + aliasLocation); } + if (!HadoopShims.isHadoopYARN() && jobTrackerLoc != null) { + log.info("More information at: http://" + jobTrackerLoc + + "/jobdetails.jsp?jobid=" + job.getAssignedJobID()); + } + // update statistics for this job so jobId is set MRPigStatsUtil.addJobStats(job); MRScriptState.get().emitJobStartedNotification( @@ -492,6 +475,10 @@ public class MapReduceLauncher extends L for (Job job : succJobs) { List<POStore> sts = jcc.getStores(job); for (POStore st : sts) { + if (Utils.isLocal(pc, job.getJobConf())) { + HadoopShims.storeSchemaForLocal(job, st); + } + if (!st.isTmpStore()) { // create an "_SUCCESS" file in output location if // output location is a filesystem dir @@ -757,7 +744,7 @@ public class MapReduceLauncher extends L @SuppressWarnings("deprecation") void computeWarningAggregate(Job job, Map<Enum, Long> aggMap) { try { - Counters counters = MRJobStats.getCounters(job); + Counters counters = HadoopShims.getCounters(job); if (counters==null) { long nullCounterCount = @@ -811,13 +798,13 @@ public class MapReduceLauncher extends L throw new ExecException(backendException); } try { - Iterator<TaskReport> mapRep = MRJobStats.getTaskReports(job, TaskType.MAP); + Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP); if (mapRep != null) { getErrorMessages(mapRep, "map", errNotDbg, pigContext); totalHadoopTimeSpent += computeTimeSpent(mapRep); mapRep = null; } - Iterator<TaskReport> redRep = MRJobStats.getTaskReports(job, TaskType.REDUCE); + Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE); if (redRep != null) { getErrorMessages(redRep, "reduce", errNotDbg, pigContext); totalHadoopTimeSpent += computeTimeSpent(redRep); @@ -835,6 +822,5 @@ public class MapReduceLauncher extends L throw new ExecException(e); } } - } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Fri Feb 24 03:34:37 2017 @@ -65,10 +65,7 @@ public class MapReduceOper extends Opera // this is needed when the key is null to create // an appropriate NullableXXXWritable object public byte mapKeyType; - - //record the map key types of all splittees - public byte[] mapKeyTypeOfSplittees; - + //Indicates that the map plan creation //is complete boolean mapDone = false; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Fri Feb 24 03:34:37 2017 @@ -18,7 +18,6 @@ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; import java.util.ArrayList; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -581,17 +580,18 @@ class MultiQueryOptimizer extends MROpPl } private boolean hasSameMapKeyType(List<MapReduceOper> splittees) { - Set<Byte> keyTypes = new HashSet<Byte>(); - for (MapReduceOper splittee : splittees) { - keyTypes.add(splittee.mapKeyType); - if (splittee.mapKeyTypeOfSplittees != null) { - for (int i = 0; i < splittee.mapKeyTypeOfSplittees.length; i++) { - keyTypes.add(splittee.mapKeyTypeOfSplittees[i]); + boolean sameKeyType = true; + for (MapReduceOper outer : splittees) { + for (MapReduceOper inner : splittees) { + if (inner.mapKeyType != outer.mapKeyType) { + sameKeyType = false; + break; } } - + if (!sameKeyType) break; } - return keyTypes.size() == 1; + + return sameKeyType; } private int setIndexOnLRInSplit(int initial, POSplit splitOp, boolean sameKeyType) @@ -1035,20 +1035,10 @@ class MultiQueryOptimizer extends MROpPl splitter.mapKeyType = sameKeyType ? mergeList.get(0).mapKeyType : DataType.TUPLE; - - setMapKeyTypeForSplitter(splitter,mergeList); - log.info("Requested parallelism of splitter: " + splitter.getRequestedParallelism()); } - private void setMapKeyTypeForSplitter(MapReduceOper splitter, List<MapReduceOper> mergeList) { - splitter.mapKeyTypeOfSplittees = new byte[mergeList.size()]; - for (int i = 0; i < mergeList.size(); i++) { - splitter.mapKeyTypeOfSplittees[i] = mergeList.get(i).mapKeyType; - } - } - private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce, MapReduceOper splitter, POSplit splitOp) throws VisitorException { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Fri Feb 24 03:34:37 2017 @@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; -import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,11 +36,9 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.util.ObjectSerializer; @@ -75,6 +72,7 @@ public class PigCombiner { PhysicalOperator[] roots; PhysicalOperator leaf; + PigContext pigContext = null; private volatile boolean initialized = false; //@StaticDataCleanup @@ -93,11 +91,9 @@ public class PigCombiner { Configuration jConf = context.getConfiguration(); try { PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list"))); - Properties log4jProperties = (Properties) ObjectSerializer - .deserialize(jConf.get(PigImplConstants.PIG_LOG4J_PROPERTIES)); - if (log4jProperties != null) { - PropertyConfigurator.configure(log4jProperties); - } + pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext")); + if (pigContext.getLog4jProperties()!=null) + PropertyConfigurator.configure(pigContext.getLog4jProperties()); UDFContext.getUDFContext().reset(); MapRedUtil.setupUDFContext(context.getConfiguration()); @@ -147,7 +143,7 @@ public class PigCombiner { pigReporter.setRep(context); PhysicalOperator.setReporter(pigReporter); - boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new MRTaskContext(context)); PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); @@ -161,7 +157,7 @@ public class PigCombiner { // tuples out of the getnext() call of POJoinPackage // In this case, we process till we see EOP from // POJoinPacakage.getNext() - if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() instanceof BloomPackager) + if (pack.getPkgr() instanceof JoinPackager) { pack.attachInput(key, tupIter.iterator()); while (true) @@ -272,6 +268,7 @@ public class PigCombiner { pigReporter = null; // Avoid OOM in Tez. PhysicalOperator.setReporter(null); + pigContext = null; roots = null; cp = null; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Fri Feb 24 03:34:37 2017 @@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,7 +45,6 @@ import org.apache.pig.data.SchemaTupleBa import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.OperatorKey; @@ -90,6 +88,7 @@ public abstract class PigGenericMapBase private PhysicalOperator leaf; + PigContext pigContext = null; private volatile boolean initialized = false; /** @@ -169,15 +168,13 @@ public abstract class PigGenericMapBase inIllustrator = inIllustrator(context); PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list"))); + pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext")); // This attempts to fetch all of the generated code from the distributed cache, and resolve it - SchemaTupleBackend.initialize(job); + SchemaTupleBackend.initialize(job, pigContext); - Properties log4jProperties = (Properties) ObjectSerializer - .deserialize(job.get(PigImplConstants.PIG_LOG4J_PROPERTIES)); - if (log4jProperties != null) { - PropertyConfigurator.configure(log4jProperties); - } + if (pigContext.getLog4jProperties()!=null) + PropertyConfigurator.configure(pigContext.getLog4jProperties()); if (mp == null) mp = (PhysicalPlan) ObjectSerializer.deserialize( @@ -239,7 +236,7 @@ public abstract class PigGenericMapBase pigReporter.setRep(context); PhysicalOperator.setReporter(pigReporter); - boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new MRTaskContext(context)); PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); @@ -252,7 +249,8 @@ public abstract class PigGenericMapBase MapReducePOStoreImpl impl = new MapReducePOStoreImpl(context); store.setStoreImpl(impl); - store.setUp(); + if (!pigContext.inIllustrator) + store.setUp(); } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Fri Feb 24 03:34:37 2017 @@ -287,6 +287,7 @@ public class PigGenericMapReduce { private PhysicalOperator leaf; + PigContext pigContext = null; protected volatile boolean initialized = false; private boolean inIllustrator = false; @@ -318,9 +319,10 @@ public class PigGenericMapReduce { sJobConf = context.getConfiguration(); try { PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list"))); + pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext")); // This attempts to fetch all of the generated code from the distributed cache, and resolve it - SchemaTupleBackend.initialize(jConf); + SchemaTupleBackend.initialize(jConf, pigContext); if (rp == null) rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf @@ -375,7 +377,7 @@ public class PigGenericMapReduce { pigReporter.setRep(context); PhysicalOperator.setReporter(pigReporter); - boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new MRTaskContext(context)); PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); @@ -606,7 +608,7 @@ public class PigGenericMapReduce { pigReporter.setRep(context); PhysicalOperator.setReporter(pigReporter); - boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new MRTaskContext(context)); PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Fri Feb 24 03:34:37 2017 @@ -17,6 +17,9 @@ */ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; +import java.util.Map; +import java.util.WeakHashMap; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.EvalFunc; @@ -38,6 +41,7 @@ public final class PigHadoopLogger imple private PigStatusReporter reporter = null; private boolean aggregate = false; + private Map<Object, String> msgMap = new WeakHashMap<Object, String>(); private PigHadoopLogger() { } @@ -64,6 +68,11 @@ public final class PigHadoopLogger imple if (getAggregate()) { if (reporter != null) { + // log at least once + if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) { + log.warn(displayMessage); + msgMap.put(o, displayMessage); + } if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) { reporter.incrCounter(className, warningEnum.name(), 1); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Fri Feb 24 03:34:37 2017 @@ -197,11 +197,14 @@ public class PigInputFormat extends Inpu ArrayList<FileSpec> inputs; ArrayList<ArrayList<OperatorKey>> inpTargets; + PigContext pigContext; try { inputs = (ArrayList<FileSpec>) ObjectSerializer .deserialize(conf.get(PIG_INPUTS)); inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer .deserialize(conf.get(PIG_INPUT_TARGETS)); + pigContext = (PigContext) ObjectSerializer.deserialize(conf + .get("pig.pigContext")); PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list"))); MapRedUtil.setupUDFContext(conf); } catch (Exception e) { @@ -231,7 +234,7 @@ public class PigInputFormat extends Inpu // if the execution is against Mapred DFS, set // working dir to /user/<userid> - if(!Utils.isLocal(conf)) { + if(!Utils.isLocal(pigContext, conf)) { fs.setWorkingDirectory(jobcontext.getWorkingDirectory()); } @@ -267,7 +270,7 @@ public class PigInputFormat extends Inpu jobcontext.getJobID())); List<InputSplit> oneInputPigSplits = getPigSplits( oneInputSplits, i, inpTargets.get(i), - fs.getDefaultBlockSize(isFsPath? path: fs.getWorkingDirectory()), + HadoopShims.getDefaultBlockSize(fs, isFsPath? path: fs.getWorkingDirectory()), combinable, confClone); splits.addAll(oneInputPigSplits); } catch (ExecException ee) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Fri Feb 24 03:34:37 2017 @@ -18,6 +18,7 @@ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; import java.io.IOException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; @@ -155,7 +156,12 @@ public class PigOutputCommitter extends for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) { if (mapCommitter.first!=null) { try { - allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && mapCommitter.first.isRecoverySupported(); + // Use reflection, Hadoop 1.x line does not have such method + Method m = mapCommitter.first.getClass().getMethod("isRecoverySupported"); + allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery + && (Boolean)m.invoke(mapCommitter.first); + } catch (NoSuchMethodException e) { + allOutputCommitterSupportRecovery = false; } catch (Exception e) { throw new RuntimeException(e); } @@ -167,7 +173,12 @@ public class PigOutputCommitter extends reduceOutputCommitters) { if (reduceCommitter.first!=null) { try { - allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && reduceCommitter.first.isRecoverySupported(); + // Use reflection, Hadoop 1.x line does not have such method + Method m = reduceCommitter.first.getClass().getMethod("isRecoverySupported"); + allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery + && (Boolean)m.invoke(reduceCommitter.first); + } catch (NoSuchMethodException e) { + allOutputCommitterSupportRecovery = false; } catch (Exception e) { throw new RuntimeException(e); } @@ -186,7 +197,10 @@ public class PigOutputCommitter extends mapCommitter.second); try { // Use reflection, Hadoop 1.x line does not have such method - mapCommitter.first.recoverTask(updatedContext); + Method m = mapCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class); + m.invoke(mapCommitter.first, updatedContext); + } catch (NoSuchMethodException e) { + // We are using Hadoop 1.x, ignore } catch (Exception e) { throw new IOException(e); } @@ -198,7 +212,11 @@ public class PigOutputCommitter extends TaskAttemptContext updatedContext = setUpContext(context, reduceCommitter.second); try { - reduceCommitter.first.recoverTask(updatedContext); + // Use reflection, Hadoop 1.x line does not have such method + Method m = reduceCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class); + m.invoke(reduceCommitter.first, updatedContext); + } catch (NoSuchMethodException e) { + // We are using Hadoop 1.x, ignore } catch (Exception e) { throw new IOException(e); } @@ -238,7 +256,10 @@ public class PigOutputCommitter extends mapCommitter.second); // PIG-2642 promote files before calling storeCleanup/storeSchema try { - mapCommitter.first.commitJob(updatedContext); + // Use reflection, 20.2 does not have such method + Method m = mapCommitter.first.getClass().getMethod("commitJob", JobContext.class); + m.setAccessible(true); + m.invoke(mapCommitter.first, updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -252,7 +273,10 @@ public class PigOutputCommitter extends reduceCommitter.second); // PIG-2642 promote files before calling storeCleanup/storeSchema try { - reduceCommitter.first.commitJob(updatedContext); + // Use reflection, 20.2 does not have such method + Method m = reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class); + m.setAccessible(true); + m.invoke(reduceCommitter.first, updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -269,7 +293,10 @@ public class PigOutputCommitter extends JobContext updatedContext = setUpContext(context, mapCommitter.second); try { - mapCommitter.first.abortJob(updatedContext, state); + // Use reflection, 20.2 does not have such method + Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class); + m.setAccessible(true); + m.invoke(mapCommitter.first, updatedContext, state); } catch (Exception e) { throw new IOException(e); } @@ -282,7 +309,10 @@ public class PigOutputCommitter extends JobContext updatedContext = setUpContext(context, reduceCommitter.second); try { - reduceCommitter.first.abortJob(updatedContext, state); + // Use reflection, 20.2 does not have such method + Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class); + m.setAccessible(true); + m.invoke(reduceCommitter.first, updatedContext, state); } catch (Exception e) { throw new IOException(e); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Fri Feb 24 03:34:37 2017 @@ -515,11 +515,9 @@ public class PigSplit extends InputSplit for (int i = 0; i < wrappedSplits.length; i++) { st.append("Input split["+i+"]:\n Length = "+ wrappedSplits[i].getLength()+"\n ClassName: " + wrappedSplits[i].getClass().getName() + "\n Locations:\n"); - if (wrappedSplits[i]!=null && wrappedSplits[i].getLocations()!=null) { - for (String location : wrappedSplits[i].getLocations()) - st.append(" "+location+"\n"); - st.append("\n-----------------------\n"); - } + for (String location : wrappedSplits[i].getLocations()) + st.append(" "+location+"\n"); + st.append("\n-----------------------\n"); } } catch (IOException e) { return null; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java Fri Feb 24 03:34:37 2017 @@ -26,21 +26,21 @@ public class DiscreteProbabilitySampleGe Random rGen; float[] probVec; float epsilon = 0.0001f; - + private static final Log LOG = LogFactory.getLog(DiscreteProbabilitySampleGenerator.class); - - public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) { - rGen = new Random(seed); + + public DiscreteProbabilitySampleGenerator(float[] probVec) { + rGen = new Random(); float sum = 0.0f; for (float f : probVec) { sum += f; } this.probVec = probVec; - if (1-epsilon > sum || sum > 1+epsilon) { + if (1-epsilon > sum || sum > 1+epsilon) { LOG.info("Sum of probabilities should be near one: " + sum); } } - + public int getNext(){ double toss = rGen.nextDouble(); // if the uniformly random number that I generated @@ -57,13 +57,13 @@ public class DiscreteProbabilitySampleGe toss -= probVec[i]; if(toss<=0.0) return i; - } + } return lastIdx; } - + public static void main(String[] args) { float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f }; - DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(11317, vec); + DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(vec); CountingMap<Integer> cm = new CountingMap<Integer>(); for(int i=0;i<100;i++){ cm.put(gen.getNext(), 1); @@ -75,6 +75,6 @@ public class DiscreteProbabilitySampleGe public String toString() { return Arrays.toString(probVec); } - - + + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Fri Feb 24 03:34:37 2017 @@ -17,6 +17,7 @@ */ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -30,13 +31,13 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.DataBag; import org.apache.pig.data.InternalMap; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.PigImplConstants; +import org.apache.pig.impl.PigContext; import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.io.NullableBigDecimalWritable; import org.apache.pig.impl.io.NullableBigIntegerWritable; @@ -51,6 +52,7 @@ import org.apache.pig.impl.io.NullableTe import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.io.ReadToEndLoader; +import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.Utils; public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable> @@ -60,6 +62,7 @@ public class WeightedRangePartitioner ex new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>(); protected PigNullableWritable[] quantiles; protected RawComparator<PigNullableWritable> comparator; + private PigContext pigContext; protected Configuration job; protected boolean inited = false; @@ -90,6 +93,11 @@ public class WeightedRangePartitioner ex @SuppressWarnings("unchecked") public void init() { weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>(); + try { + pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext")); + } catch (IOException e) { + throw new RuntimeException("Failed to deserialize pig context: ", e); + } String quantilesFile = job.get("pig.quantilesFile", ""); if (quantilesFile.length() == 0) { @@ -101,10 +109,10 @@ public class WeightedRangePartitioner ex // use local file system to get the quantilesFile Map<String, Object> quantileMap = null; Configuration conf; - if (job.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false)) { - conf = new Configuration(false); + if (!pigContext.getExecType().isLocal()) { + conf = ConfigurationUtil.toConfiguration(pigContext.getProperties()); } else { - conf = new Configuration(job); + conf = new Configuration(false); } if (job.get("fs.file.impl") != null) { conf.set("fs.file.impl", job.get("fs.file.impl")); @@ -130,13 +138,11 @@ public class WeightedRangePartitioner ex DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST); InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS); convertToArray(quantilesList); - long taskIdHashCode = job.get(MRConfiguration.TASK_ID).hashCode(); - long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL); for (Entry<Object, Object> ent : weightedPartsData.entrySet()) { Tuple key = (Tuple)ent.getKey(); // sample item which repeats float[] probVec = getProbVec((Tuple)ent.getValue()); weightedParts.put(getPigNullableWritable(key), - new DiscreteProbabilitySampleGenerator(randomSeed, probVec)); + new DiscreteProbabilitySampleGenerator(probVec)); } } // else - the quantiles file is empty - unless we have a bug, the Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Fri Feb 24 03:34:37 2017 @@ -21,16 +21,14 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; @@ -107,7 +105,7 @@ public class EndOfAllInputSetter extends public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException { endOfAllInputFlag = true; } - + @Override public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException { endOfAllInputFlag = true; @@ -124,13 +122,6 @@ public class EndOfAllInputSetter extends } } - @Override - public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{ - if (lr instanceof POBuildBloomRearrangeTez) { - endOfAllInputFlag = true; - } - super.visitLocalRearrange(lr); - } /** * @return if end of all input is present Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java Fri Feb 24 03:34:37 2017 @@ -27,7 +27,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter; -import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; /** @@ -43,7 +43,7 @@ public class MRPrinter extends MROpPlanV * @param plan MR plan to print */ public MRPrinter(PrintStream ps, MROperPlan plan) { - super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(plan, true)); + super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan)); mStream = ps; mStream.println("#--------------------------------------------------"); mStream.println("# Map Reduce Plan "); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Feb 24 03:34:37 2017 @@ -441,10 +441,6 @@ public abstract class PhysicalOperator e public void reset() { } - public boolean isEndOfAllInput() { - return parentPlan.endOfAllInput; - } - /** * @return PigProgressable stored in threadlocal */ Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java Fri Feb 24 03:34:37 2017 @@ -19,10 +19,7 @@ package org.apache.pig.backend.hadoop.ex import java.math.BigDecimal; import java.math.BigInteger; -import java.math.RoundingMode; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.pig.PigWarning; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -39,8 +36,6 @@ public class Divide extends BinaryExpres * */ private static final long serialVersionUID = 1L; - public static final short BIGDECIMAL_MINIMAL_SCALE = 6; - private static final Log LOG = LogFactory.getLog(Divide.class); public Divide(OperatorKey k) { super(k); @@ -77,22 +72,12 @@ public class Divide extends BinaryExpres case DataType.BIGINTEGER: return ((BigInteger) a).divide((BigInteger) b); case DataType.BIGDECIMAL: - return bigDecimalDivideWithScale(a, b); + return ((BigDecimal) a).divide((BigDecimal) b); default: throw new ExecException("called on unsupported Number class " + DataType.findTypeName(dataType)); } } - private Number bigDecimalDivideWithScale(Number a, Number b) { - // Using same result scaling as Hive. See Arithmetic Rules: - // https://cwiki.apache.org/confluence/download/attachments/27362075/Hive_Decimal_Precision_Scale_Support.pdf - int resultScale = Math.max(BIGDECIMAL_MINIMAL_SCALE, ((BigDecimal)a).scale() + ((BigDecimal)b).precision() + 1); - if (LOG.isDebugEnabled()) { - LOG.debug("For bigdecimal divide: using " + resultScale + " as result scale."); - } - return ((BigDecimal)a).divide((BigDecimal)b, resultScale, RoundingMode.HALF_UP); - } - /* * This method is used to invoke the appropriate method, as Java does not provide generic * dispatch for it. Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Fri Feb 24 03:34:37 2017 @@ -28,7 +28,6 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; @@ -90,8 +89,6 @@ public class POCast extends ExpressionOp caster = ((LoadFunc)obj).getLoadCaster(); } else if (obj instanceof StreamToPig) { caster = ((StreamToPig)obj).getLoadCaster(); - } else if (obj instanceof EvalFunc) { - caster = ((EvalFunc)obj).getLoadCaster(); } else { throw new IOException("Invalid class type " + funcSpec.getClassName()); @@ -168,7 +165,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToBigInteger(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "BigInteger."; throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -284,7 +281,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToBigDecimal(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "BigDecimal."; throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -399,7 +396,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToBoolean(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "boolean for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "boolean."; throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -513,7 +510,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToInteger(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "int."; throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -639,7 +636,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToLong(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "long."; throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -762,7 +759,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToDouble(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "double."; throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -884,7 +881,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToFloat(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "float."; throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1010,7 +1007,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToDateTime(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "datetime."; throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1121,7 +1118,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToCharArray(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "string for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "string."; throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1273,7 +1270,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToTuple(dba.get(), fieldSchema); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "tuple."; throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1335,7 +1332,7 @@ public class POCast extends ExpressionOp result = caster.bytesToBag(((DataByteArray)obj).get(), fs); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "bag."; throw new ExecException(msg, errCode, PigException.INPUT); } } else { @@ -1366,7 +1363,7 @@ public class POCast extends ExpressionOp result = caster.bytesToTuple(((DataByteArray)obj).get(), fs); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "tuple."; throw new ExecException(msg, errCode, PigException.INPUT); } } else { @@ -1391,7 +1388,7 @@ public class POCast extends ExpressionOp result = caster.bytesToMap(((DataByteArray)obj).get(), fs); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "tuple."; throw new ExecException(msg, errCode, PigException.INPUT); } } else { @@ -1405,7 +1402,7 @@ public class POCast extends ExpressionOp result = caster.bytesToBoolean(((DataByteArray) obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "int."; throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1444,7 +1441,7 @@ public class POCast extends ExpressionOp result = caster.bytesToInteger(((DataByteArray) obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "int."; throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1490,7 +1487,7 @@ public class POCast extends ExpressionOp result = caster.bytesToDouble(((DataByteArray) obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "double."; throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1536,7 +1533,7 @@ public class POCast extends ExpressionOp result = caster.bytesToLong(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "long."; throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1582,7 +1579,7 @@ public class POCast extends ExpressionOp result = caster.bytesToFloat(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "float."; throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1628,7 +1625,7 @@ public class POCast extends ExpressionOp result = caster.bytesToDateTime(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "datetime."; throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1667,7 +1664,7 @@ public class POCast extends ExpressionOp result = caster.bytesToCharArray(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "float."; throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1715,7 +1712,7 @@ public class POCast extends ExpressionOp result = caster.bytesToBigInteger(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "BigInteger."; throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1760,7 +1757,7 @@ public class POCast extends ExpressionOp result = caster.bytesToBigDecimal(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "BigDecimal."; throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1798,10 +1795,6 @@ public class POCast extends ExpressionOp default: throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT); } - case DataType.BYTEARRAY: - //no-op (PIG-4933) - result = obj; - break; default: throw new ExecException("Don't know how to convert "+ obj + " to " + fs, 1120, PigException.INPUT); } @@ -1868,7 +1861,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToBag(dba.get(), fieldSchema); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "bag."; throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1959,7 +1952,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToMap(dba.get(), fieldSchema); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "map for " + this.getOriginalLocations(); + String msg = unknownByteArrayErrorMessage + "map."; throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Fri Feb 24 03:34:37 2017 @@ -158,19 +158,23 @@ public class POProject extends Expressio illustratorMarkup(inpValue, res.result, -1); return res; } else if(columns.size() == 1) { - if ( inpValue == null ) { - // the tuple is null, so a dereference should also produce a null - res.returnStatus = POStatus.STATUS_OK; - ret = null; - } else if( inpValue.size() > columns.get(0) ) { + try { ret = inpValue.get(columns.get(0)); - } else { + } catch (IndexOutOfBoundsException ie) { if(pigLogger != null) { pigLogger.warn(this,"Attempt to access field " + "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); } res.returnStatus = POStatus.STATUS_OK; ret = null; + } catch (NullPointerException npe) { + // the tuple is null, so a dereference should also produce a null + // there is a slight danger here that the Tuple implementation + // may have given the exception for a different reason but if we + // don't catch it, we will die and the most common case for the + // exception would be because the tuple is null + res.returnStatus = POStatus.STATUS_OK; + ret = null; } } else if(isProjectToEnd){ ret = getRangeTuple(inpValue); @@ -211,18 +215,23 @@ public class POProject extends Expressio */ private void addColumn(ArrayList<Object> objList, Tuple inpValue, int i) throws ExecException { - if( inpValue == null ) { - // the tuple is null, so a dereference should also produce a null - objList.add(null); - } else if( inpValue.size() > i ) { + try { objList.add(inpValue.get(i)); - } else { + } catch (IndexOutOfBoundsException ie) { if(pigLogger != null) { pigLogger.warn(this,"Attempt to access field " + i + " which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); } objList.add(null); } + catch (NullPointerException npe) { + // the tuple is null, so a dereference should also produce a null + // there is a slight danger here that the Tuple implementation + // may have given the exception for a different reason but if we + // don't catch it, we will die and the most common case for the + // exception would be because the tuple is null + objList.add(null); + } } @Override @@ -397,17 +406,21 @@ public class POProject extends Expressio Object ret; if(columns.size() == 1) { - if( inpValue == null ) { - // the tuple is null, so a dereference should also produce a null - ret = null; - } else if( inpValue.size() > columns.get(0) ) { + try{ ret = inpValue.get(columns.get(0)); - } else { + } catch (IndexOutOfBoundsException ie) { if(pigLogger != null) { pigLogger.warn(this,"Attempt to access field " + "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); } ret = null; + } catch (NullPointerException npe) { + // the tuple is null, so a dereference should also produce a null + // there is a slight danger here that the Tuple implementation + // may have given the exception for a different reason but if we + // don't catch it, we will die and the most common case for the + // exception would be because the tuple is null + ret = null; } } else if(isProjectToEnd) { ret = getRangeTuple(inpValue); @@ -415,17 +428,21 @@ public class POProject extends Expressio ArrayList<Object> objList = new ArrayList<Object>(columns.size()); for(int col: columns) { - if( inpValue == null ) { - // the tuple is null, so a dereference should also produce a null - objList.add(null); - } else if( inpValue.size() > col ) { + try { objList.add(inpValue.get(col)); - } else { + } catch (IndexOutOfBoundsException ie) { if(pigLogger != null) { pigLogger.warn(this,"Attempt to access field " + "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); } objList.add(null); + } catch (NullPointerException npe) { + // the tuple is null, so a dereference should also produce a null + // there is a slight danger here that the Tuple implementation + // may have given the exception for a different reason but if we + // don't catch it, we will die and the most common case for the + // exception would be because the tuple is null + objList.add(null); } } ret = mTupleFactory.newTuple(objList); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Fri Feb 24 03:34:37 2017 @@ -49,7 +49,7 @@ public class CombinerPackager extends Pa private Map<Integer, Integer> keyLookup; private int numBags; - + private transient boolean initialized; private transient boolean useDefaultBag; @@ -77,15 +77,6 @@ public class CombinerPackager extends Pa } } - @Override - public void attachInput(Object key, DataBag[] bags, boolean[] readOnce) - throws ExecException { - this.key = key; - this.bags = bags; - this.readOnce = readOnce; - // Bag can be read directly and need not be materialized again - } - /** * @param keyInfo the keyInfo to set */ Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Fri Feb 24 03:34:37 2017 @@ -17,7 +17,7 @@ */ /** - * + * */ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; @@ -28,7 +28,6 @@ import java.util.Map; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; -import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; @@ -49,15 +48,6 @@ public class LitePackager extends Packag private PigNullableWritable keyWritable; @Override - public void attachInput(Object key, DataBag[] bags, boolean[] readOnce) - throws ExecException { - this.key = key; - this.bags = bags; - this.readOnce = readOnce; - // Bag can be read directly and need not be materialized again - } - - @Override public boolean[] getInner() { return null; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java Fri Feb 24 03:34:37 2017 @@ -256,9 +256,4 @@ public class POCross extends PhysicalOpe data = null; } - @Override - public void reset() { - clearMemory(); - } - }
