Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Wed Feb 22 09:43:41 2017 @@ -32,7 +32,8 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.TaskReport; +import org.apache.hadoop.mapred.TIPStatus; +import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.pig.FuncSpec; @@ -40,7 +41,6 @@ import org.apache.pig.PigException; import org.apache.pig.backend.BackendException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.PlanException; @@ -76,7 +76,7 @@ public abstract class Launcher { protected Map<FileSpec, Exception> failureMap; protected JobControl jc = null; - class HangingJobKiller extends Thread { + protected class HangingJobKiller extends Thread { public HangingJobKiller() {} @Override @@ -90,7 +90,6 @@ public abstract class Launcher { } protected Launcher() { - Runtime.getRuntime().addShutdownHook(new HangingJobKiller()); // handle the windows portion of \r if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) { newLine = "\r\n"; @@ -104,7 +103,6 @@ public abstract class Launcher { public void reset() { failureMap = Maps.newHashMap(); totalHadoopTimeSpent = 0; - jc = null; } /** @@ -179,7 +177,7 @@ public abstract class Launcher { String exceptionCreateFailMsg = null; boolean jobFailed = false; if (msgs.length > 0) { - if (HadoopShims.isJobFailed(report)) { + if (report.getCurrentStatus()== TIPStatus.FAILED) { jobFailed = true; } Set<String> errorMessageSet = new HashSet<String>(); @@ -261,11 +259,30 @@ public abstract class Launcher { List<Job> runnJobs = jc.getRunningJobs(); for (Job j : runnJobs) { - prog += HadoopShims.progressOfRunningJob(j); + prog += progressOfRunningJob(j); } return prog; } + /** + * Returns the progress of a Job j which is part of a submitted JobControl + * object. The progress is for this Job. So it has to be scaled down by the + * num of jobs that are present in the JobControl. + * + * @param j The Job for which progress is required + * @return Returns the percentage progress of this Job + * @throws IOException + */ + private static double progressOfRunningJob(Job j) + throws IOException { + org.apache.hadoop.mapreduce.Job mrJob = j.getJob(); + try { + return (mrJob.mapProgress() + mrJob.reduceProgress()) / 2; + } catch (Exception ir) { + return 0; + } + } + public long getTotalHadoopTimeSpent() { return totalHadoopTimeSpent; }
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Wed Feb 22 09:43:41 2017 @@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.TaskA import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor; @@ -122,7 +123,8 @@ public class FetchLauncher { poStore.setUp(); TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID(); - HadoopShims.setTaskAttemptId(conf, taskAttemptID); + //Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop + conf.setInt(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, taskAttemptID.getId()); if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) { MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java Wed Feb 22 09:43:41 2017 @@ -95,7 +95,7 @@ public class FetchPOStoreImpl extends PO } if (outputCommitter.needsTaskCommit(context)) outputCommitter.commitTask(context); - HadoopShims.commitOrCleanup(outputCommitter, context); + outputCommitter.commitJob(context); } @Override @@ -109,7 +109,7 @@ public class FetchPOStoreImpl extends PO } writer = null; } - HadoopShims.commitOrCleanup(outputCommitter, context); + outputCommitter.commitJob(context); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java Wed Feb 22 09:43:41 2017 @@ -22,43 +22,48 @@ import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Reducer; - -import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; /** * A special implementation of combiner used only for distinct. This combiner * does not even parse out the records. It just throws away duplicate values - * in the key in order ot minimize the data being sent to the reduce. + * in the key in order to minimize the data being sent to the reduce. */ public class DistinctCombiner { - public static class Combine + public static class Combine extends Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> { - + private final Log log = LogFactory.getLog(getClass()); - ProgressableReporter pigReporter; - - /** - * Configures the reporter - */ + private static boolean firstTime = true; + + //@StaticDataCleanup + public static void staticDataCleanup() { + firstTime = true; + } + @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); - pigReporter = new ProgressableReporter(); + Configuration jConf = context.getConfiguration(); + // Avoid log spamming + if (firstTime) { + log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location")); + firstTime = false; + } } - + /** * The reduce function which removes values. */ @Override - protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) + protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) throws IOException, InterruptedException { - - pigReporter.setRep(context); // Take the first value and the key and collect // just that. @@ -66,6 +71,7 @@ public class DistinctCombiner { NullableTuple val = iter.next(); context.write(key, val); } + } - + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Wed Feb 22 09:43:41 2017 @@ -75,16 +75,24 @@ public class FileBasedOutputSizeReader i return -1; } - long bytes = 0; Path p = new Path(getLocationUri(sto)); - FileSystem fs = p.getFileSystem(conf); - FileStatus[] lst = fs.listStatus(p); + return getPathSize(p, p.getFileSystem(conf)); + } + + private long getPathSize(Path storePath, FileSystem fs) throws IOException { + long bytes = 0; + FileStatus[] lst = fs.listStatus(storePath); if (lst != null) { for (FileStatus status : lst) { - bytes += status.getLen(); + if (status.isFile()) { + if (status.getLen() > 0) + bytes += status.getLen(); + } + else { // recursively count nested leaves' (files) sizes + bytes += getPathSize(status.getPath(), fs); + } } } - return bytes; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Wed Feb 22 09:43:41 2017 @@ -92,7 +92,7 @@ public class InputSizeReducerEstimator i return reducers; } - static long getTotalInputFileSize(Configuration conf, + public static long getTotalInputFileSize(Configuration conf, List<POLoad> lds, Job job) throws IOException { return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE); } @@ -100,7 +100,7 @@ public class InputSizeReducerEstimator i /** * Get the input size for as many inputs as possible. Inputs that do not report * their size nor can pig look that up itself are excluded from this size. - * + * * @param conf Configuration * @param lds List of POLoads * @param job Job Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Feb 22 09:43:41 2017 @@ -24,7 +24,6 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.lang.reflect.Method; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; @@ -61,6 +60,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.pig.ComparisonFunc; import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; @@ -71,6 +71,7 @@ import org.apache.pig.PigException; import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; +import org.apache.pig.backend.hadoop.PigJobControl; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.JobCreationException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner; @@ -89,7 +90,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataType; @@ -122,6 +122,7 @@ import org.apache.pig.impl.util.ObjectSe import org.apache.pig.impl.util.Pair; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; +import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; import org.apache.pig.tools.pigstats.mapreduce.MRScriptState; @@ -311,7 +312,7 @@ public class JobControlCompiler{ " should be a time in ms. default=" + defaultPigJobControlSleep, e); } - JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep); + JobControl jobCtrl = new PigJobControl(grpName, timeToSleep); try { List<MapReduceOper> roots = new LinkedList<MapReduceOper>(); @@ -384,7 +385,7 @@ public class JobControlCompiler{ ArrayList<Pair<String,Long>> counterPairs; try { - counters = HadoopShims.getCounters(job); + counters = MRJobStats.getCounters(job); String groupName = getGroupName(counters.getGroupNames()); // In case that the counter group was not find, we need to find @@ -702,7 +703,8 @@ public class JobControlCompiler{ // since this path would be invalid for the new job being created pigContext.getProperties().remove("mapreduce.job.credentials.binary"); - conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext)); + conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pigContext.getExecType().isLocal()); + conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pigContext.getLog4jProperties())); conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList())); // this is for unit tests since some don't create PigServer @@ -1671,14 +1673,6 @@ public class JobControlCompiler{ if (distCachePath != null) { log.info("Jar file " + url + " already in DistributedCache as " + distCachePath + ". Not copying to hdfs and adding again"); - // Path already in dist cache - if (!HadoopShims.isHadoopYARN()) { - // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth. - // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files - // But path may only be in 'mapred.cache.files' and not be in - // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there - DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf)); - } } else { // REGISTER always copies locally the jar file. see PigServer.registerJar() @@ -1964,20 +1958,9 @@ public class JobControlCompiler{ public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) { // the OutputFormat we report to Hadoop is always PigOutputFormat which - // can be wrapped with LazyOutputFormat provided if it is supported by - // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set + // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) { - try { - Class<?> clazz = PigContext - .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat"); - Method method = clazz.getMethod("setOutputFormatClass", - org.apache.hadoop.mapreduce.Job.class, Class.class); - method.invoke(null, job, PigOutputFormat.class); - } catch (Exception e) { - job.setOutputFormatClass(PigOutputFormat.class); - log.warn(PigConfiguration.PIG_OUTPUT_LAZY - + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used"); - } + LazyOutputFormat.setOutputFormatClass(job,PigOutputFormat.class); } else { job.setOutputFormatClass(PigOutputFormat.class); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Feb 22 09:43:41 2017 @@ -1116,7 +1116,9 @@ public class MRCompiler extends PhyPlanV try{ nonBlocking(op); phyToMROpMap.put(op, curMROp); - if (op.getPkgr().getPackageType() == PackageType.JOIN) { + if (op.getPkgr().getPackageType() == PackageType.JOIN + || op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) { + // Bloom join is not implemented in mapreduce mode and falls back to regular join curMROp.markRegularJoin(); } else if (op.getPkgr().getPackageType() == PackageType.GROUP) { if (op.getNumInps() == 1) { @@ -1278,7 +1280,7 @@ public class MRCompiler extends PhyPlanV List<InputSplit> splits = inf.getSplits(HadoopShims.cloneJobContext(job)); List<List<InputSplit>> results = MapRedUtil .getCombinePigSplits(splits, - HadoopShims.getDefaultBlockSize(fs, path), + fs.getDefaultBlockSize(path), conf); numFiles += results.size(); } else { @@ -2432,7 +2434,7 @@ public class MRCompiler extends PhyPlanV }else{ for(int i=0; i<transformPlans.size(); i++) { eps1.add(transformPlans.get(i)); - flat1.add(true); + flat1.add(i == transformPlans.size() - 1 ? true : false); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Feb 22 09:43:41 2017 @@ -19,7 +19,9 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.io.PrintStream; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Calendar; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -40,7 +42,8 @@ import org.apache.hadoop.mapred.JobClien import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapred.TaskReport; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapreduce.TaskType; import org.apache.pig.PigConfiguration; @@ -65,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.CompilationMessageCollector; @@ -78,15 +82,18 @@ import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.PigStatsUtil; +import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; import org.apache.pig.tools.pigstats.mapreduce.MRScriptState; +import org.python.google.common.collect.Lists; + /** * Main class that launches pig for Map Reduce * */ -public class MapReduceLauncher extends Launcher{ +public class MapReduceLauncher extends Launcher { public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; @@ -94,14 +101,30 @@ public class MapReduceLauncher extends L private boolean aggregateWarning = false; + public MapReduceLauncher() { + super(); + Utils.addShutdownHookWithPriority(new HangingJobKiller(), + PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY); + } + @Override public void kill() { try { - log.debug("Receive kill signal"); - if (jc!=null) { + if (jc != null && jc.getRunningJobs().size() > 0) { + log.info("Received kill signal"); for (Job job : jc.getRunningJobs()) { - HadoopShims.killJob(job); + org.apache.hadoop.mapreduce.Job mrJob = job.getJob(); + try { + if (mrJob != null) { + mrJob.killJob(); + } + } catch (Exception ir) { + throw new IOException(ir); + } log.info("Job " + job.getAssignedJobID() + " killed"); + String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + .format(Calendar.getInstance().getTime()); + System.err.println(timeStamp + " Job " + job.getAssignedJobID() + " killed"); } } } catch (Exception e) { @@ -301,8 +324,7 @@ public class MapReduceLauncher extends L // Now wait, till we are finished. while(!jc.allFinished()){ - try { jcThread.join(sleepTime); } - catch (InterruptedException e) {} + jcThread.join(sleepTime); List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>(); @@ -321,11 +343,6 @@ public class MapReduceLauncher extends L log.info("detailed locations: " + aliasLocation); } - if (!HadoopShims.isHadoopYARN() && jobTrackerLoc != null) { - log.info("More information at: http://" + jobTrackerLoc - + "/jobdetails.jsp?jobid=" + job.getAssignedJobID()); - } - // update statistics for this job so jobId is set MRPigStatsUtil.addJobStats(job); MRScriptState.get().emitJobStartedNotification( @@ -475,10 +492,6 @@ public class MapReduceLauncher extends L for (Job job : succJobs) { List<POStore> sts = jcc.getStores(job); for (POStore st : sts) { - if (Utils.isLocal(pc, job.getJobConf())) { - HadoopShims.storeSchemaForLocal(job, st); - } - if (!st.isTmpStore()) { // create an "_SUCCESS" file in output location if // output location is a filesystem dir @@ -744,7 +757,7 @@ public class MapReduceLauncher extends L @SuppressWarnings("deprecation") void computeWarningAggregate(Job job, Map<Enum, Long> aggMap) { try { - Counters counters = HadoopShims.getCounters(job); + Counters counters = MRJobStats.getCounters(job); if (counters==null) { long nullCounterCount = @@ -798,13 +811,13 @@ public class MapReduceLauncher extends L throw new ExecException(backendException); } try { - Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP); + Iterator<TaskReport> mapRep = MRJobStats.getTaskReports(job, TaskType.MAP); if (mapRep != null) { getErrorMessages(mapRep, "map", errNotDbg, pigContext); totalHadoopTimeSpent += computeTimeSpent(mapRep); mapRep = null; } - Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE); + Iterator<TaskReport> redRep = MRJobStats.getTaskReports(job, TaskType.REDUCE); if (redRep != null) { getErrorMessages(redRep, "reduce", errNotDbg, pigContext); totalHadoopTimeSpent += computeTimeSpent(redRep); @@ -822,5 +835,6 @@ public class MapReduceLauncher extends L throw new ExecException(e); } } + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Wed Feb 22 09:43:41 2017 @@ -65,7 +65,10 @@ public class MapReduceOper extends Opera // this is needed when the key is null to create // an appropriate NullableXXXWritable object public byte mapKeyType; - + + //record the map key types of all splittees + public byte[] mapKeyTypeOfSplittees; + //Indicates that the map plan creation //is complete boolean mapDone = false; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Wed Feb 22 09:43:41 2017 @@ -18,6 +18,7 @@ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -580,18 +581,17 @@ class MultiQueryOptimizer extends MROpPl } private boolean hasSameMapKeyType(List<MapReduceOper> splittees) { - boolean sameKeyType = true; - for (MapReduceOper outer : splittees) { - for (MapReduceOper inner : splittees) { - if (inner.mapKeyType != outer.mapKeyType) { - sameKeyType = false; - break; + Set<Byte> keyTypes = new HashSet<Byte>(); + for (MapReduceOper splittee : splittees) { + keyTypes.add(splittee.mapKeyType); + if (splittee.mapKeyTypeOfSplittees != null) { + for (int i = 0; i < splittee.mapKeyTypeOfSplittees.length; i++) { + keyTypes.add(splittee.mapKeyTypeOfSplittees[i]); } } - if (!sameKeyType) break; - } - return sameKeyType; + } + return keyTypes.size() == 1; } private int setIndexOnLRInSplit(int initial, POSplit splitOp, boolean sameKeyType) @@ -1035,10 +1035,20 @@ class MultiQueryOptimizer extends MROpPl splitter.mapKeyType = sameKeyType ? mergeList.get(0).mapKeyType : DataType.TUPLE; + + setMapKeyTypeForSplitter(splitter,mergeList); + log.info("Requested parallelism of splitter: " + splitter.getRequestedParallelism()); } + private void setMapKeyTypeForSplitter(MapReduceOper splitter, List<MapReduceOper> mergeList) { + splitter.mapKeyTypeOfSplittees = new byte[mergeList.size()]; + for (int i = 0; i < mergeList.size(); i++) { + splitter.mapKeyTypeOfSplittees[i] = mergeList.get(i).mapKeyType; + } + } + private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce, MapReduceOper splitter, POSplit splitOp) throws VisitorException { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Wed Feb 22 09:43:41 2017 @@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,9 +37,11 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.util.ObjectSerializer; @@ -72,7 +75,6 @@ public class PigCombiner { PhysicalOperator[] roots; PhysicalOperator leaf; - PigContext pigContext = null; private volatile boolean initialized = false; //@StaticDataCleanup @@ -91,9 +93,11 @@ public class PigCombiner { Configuration jConf = context.getConfiguration(); try { PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list"))); - pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext")); - if (pigContext.getLog4jProperties()!=null) - PropertyConfigurator.configure(pigContext.getLog4jProperties()); + Properties log4jProperties = (Properties) ObjectSerializer + .deserialize(jConf.get(PigImplConstants.PIG_LOG4J_PROPERTIES)); + if (log4jProperties != null) { + PropertyConfigurator.configure(log4jProperties); + } UDFContext.getUDFContext().reset(); MapRedUtil.setupUDFContext(context.getConfiguration()); @@ -143,7 +147,7 @@ public class PigCombiner { pigReporter.setRep(context); PhysicalOperator.setReporter(pigReporter); - boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new MRTaskContext(context)); PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); @@ -157,7 +161,7 @@ public class PigCombiner { // tuples out of the getnext() call of POJoinPackage // In this case, we process till we see EOP from // POJoinPacakage.getNext() - if (pack.getPkgr() instanceof JoinPackager) + if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() instanceof BloomPackager) { pack.attachInput(key, tupIter.iterator()); while (true) @@ -268,7 +272,6 @@ public class PigCombiner { pigReporter = null; // Avoid OOM in Tez. PhysicalOperator.setReporter(null); - pigContext = null; roots = null; cp = null; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Wed Feb 22 09:43:41 2017 @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,6 +46,7 @@ import org.apache.pig.data.SchemaTupleBa import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.OperatorKey; @@ -88,7 +90,6 @@ public abstract class PigGenericMapBase private PhysicalOperator leaf; - PigContext pigContext = null; private volatile boolean initialized = false; /** @@ -168,13 +169,15 @@ public abstract class PigGenericMapBase inIllustrator = inIllustrator(context); PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list"))); - pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext")); // This attempts to fetch all of the generated code from the distributed cache, and resolve it - SchemaTupleBackend.initialize(job, pigContext); + SchemaTupleBackend.initialize(job); - if (pigContext.getLog4jProperties()!=null) - PropertyConfigurator.configure(pigContext.getLog4jProperties()); + Properties log4jProperties = (Properties) ObjectSerializer + .deserialize(job.get(PigImplConstants.PIG_LOG4J_PROPERTIES)); + if (log4jProperties != null) { + PropertyConfigurator.configure(log4jProperties); + } if (mp == null) mp = (PhysicalPlan) ObjectSerializer.deserialize( @@ -236,7 +239,7 @@ public abstract class PigGenericMapBase pigReporter.setRep(context); PhysicalOperator.setReporter(pigReporter); - boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new MRTaskContext(context)); PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); @@ -249,8 +252,7 @@ public abstract class PigGenericMapBase MapReducePOStoreImpl impl = new MapReducePOStoreImpl(context); store.setStoreImpl(impl); - if (!pigContext.inIllustrator) - store.setUp(); + store.setUp(); } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Wed Feb 22 09:43:41 2017 @@ -287,7 +287,6 @@ public class PigGenericMapReduce { private PhysicalOperator leaf; - PigContext pigContext = null; protected volatile boolean initialized = false; private boolean inIllustrator = false; @@ -319,10 +318,9 @@ public class PigGenericMapReduce { sJobConf = context.getConfiguration(); try { PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list"))); - pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext")); // This attempts to fetch all of the generated code from the distributed cache, and resolve it - SchemaTupleBackend.initialize(jConf, pigContext); + SchemaTupleBackend.initialize(jConf); if (rp == null) rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf @@ -377,7 +375,7 @@ public class PigGenericMapReduce { pigReporter.setRep(context); PhysicalOperator.setReporter(pigReporter); - boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new MRTaskContext(context)); PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); @@ -608,7 +606,7 @@ public class PigGenericMapReduce { pigReporter.setRep(context); PhysicalOperator.setReporter(pigReporter); - boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new MRTaskContext(context)); PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Wed Feb 22 09:43:41 2017 @@ -17,9 +17,6 @@ */ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; -import java.util.Map; -import java.util.WeakHashMap; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.EvalFunc; @@ -41,7 +38,6 @@ public final class PigHadoopLogger imple private PigStatusReporter reporter = null; private boolean aggregate = false; - private Map<Object, String> msgMap = new WeakHashMap<Object, String>(); private PigHadoopLogger() { } @@ -68,11 +64,6 @@ public final class PigHadoopLogger imple if (getAggregate()) { if (reporter != null) { - // log at least once - if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) { - log.warn(displayMessage); - msgMap.put(o, displayMessage); - } if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) { reporter.incrCounter(className, warningEnum.name(), 1); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Wed Feb 22 09:43:41 2017 @@ -197,14 +197,11 @@ public class PigInputFormat extends Inpu ArrayList<FileSpec> inputs; ArrayList<ArrayList<OperatorKey>> inpTargets; - PigContext pigContext; try { inputs = (ArrayList<FileSpec>) ObjectSerializer .deserialize(conf.get(PIG_INPUTS)); inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer .deserialize(conf.get(PIG_INPUT_TARGETS)); - pigContext = (PigContext) ObjectSerializer.deserialize(conf - .get("pig.pigContext")); PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list"))); MapRedUtil.setupUDFContext(conf); } catch (Exception e) { @@ -234,7 +231,7 @@ public class PigInputFormat extends Inpu // if the execution is against Mapred DFS, set // working dir to /user/<userid> - if(!Utils.isLocal(pigContext, conf)) { + if(!Utils.isLocal(conf)) { fs.setWorkingDirectory(jobcontext.getWorkingDirectory()); } @@ -270,7 +267,7 @@ public class PigInputFormat extends Inpu jobcontext.getJobID())); List<InputSplit> oneInputPigSplits = getPigSplits( oneInputSplits, i, inpTargets.get(i), - HadoopShims.getDefaultBlockSize(fs, isFsPath? path: fs.getWorkingDirectory()), + fs.getDefaultBlockSize(isFsPath? path: fs.getWorkingDirectory()), combinable, confClone); splits.addAll(oneInputPigSplits); } catch (ExecException ee) { Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + + +import java.io.IOException; + +import java.net.URI; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configuration.IntegerRanges; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.StatusReporter; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.Reducer.Context; +import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; +import org.apache.hadoop.mapreduce.task.MapContextImpl; +import org.apache.hadoop.security.Credentials; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.util.Pair; +import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; + +abstract public class PigMapBase extends PigGenericMapBase { + /** + * + * Get mapper's illustrator context + * + * @param conf Configuration + * @param input Input bag to serve as data source + * @param output Map output buffer + * @param split the split + * @return Illustrator's context + * @throws IOException + * @throws InterruptedException + */ + @Override + public Context getIllustratorContext(Configuration conf, DataBag input, + List<Pair<PigNullableWritable, Writable>> output, InputSplit split) + throws IOException, InterruptedException { + org.apache.hadoop.mapreduce.Mapper.Context mapperContext = new WrappedMapper<Text, Tuple, PigNullableWritable, Writable>().getMapContext(new IllustratorContext(conf, input, output, split)); + return mapperContext; + } + + public class IllustratorContext extends MapContextImpl<Text, Tuple, PigNullableWritable, Writable> { + private DataBag input; + List<Pair<PigNullableWritable, Writable>> output; + private Iterator<Tuple> it = null; + private Tuple value = null; + private boolean init = false; + + public IllustratorContext(Configuration conf, DataBag input, + List<Pair<PigNullableWritable, Writable>> output, + InputSplit split) throws IOException, InterruptedException { + super(conf, new TaskAttemptID(), null, null, null, new IllustrateDummyReporter(), split); + conf.set("inIllustrator", "true"); + if (output == null) + throw new IOException("Null output can not be used"); + this.input = input; this.output = output; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (input == null) { + if (!init) { + init = true; + return true; + } + return false; + } + if (it == null) + it = input.iterator(); + if (!it.hasNext()) + return false; + value = it.next(); + return true; + } + + @Override + public Text getCurrentKey() { + return null; + } + + @Override + public Tuple getCurrentValue() { + return value; + } + + @Override + public void write(PigNullableWritable key, Writable value) + throws IOException, InterruptedException { + output.add(new Pair<PigNullableWritable, Writable>(key, value)); + } + + @Override + public void progress() { + + } + } + + @Override + public boolean inIllustrator(Context context) { + return ((WrappedMapper.Context)context).getConfiguration().get("inIllustrator")!=null; + } +} Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configuration.IntegerRanges; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.jobcontrol.Job; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.ReduceContext; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.Reducer.Context; +import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; +import org.apache.hadoop.mapreduce.task.ReduceContextImpl; +import org.apache.hadoop.security.Credentials; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase.IllustratorContext; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.util.Pair; +import org.apache.pig.pen.FakeRawKeyValueIterator; + +public class PigMapReduce extends PigGenericMapReduce { + + static class IllustrateReducerContext extends WrappedReducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> { + public IllustratorContext + getReducerContext(ReduceContext<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> reduceContext) { + return new IllustratorContext(reduceContext); + } + + public class IllustratorContext + extends WrappedReducer.Context { + public IllustratorContext( + ReduceContext<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> reduceContext) { + super(reduceContext); + } + public POPackage getPack() { + return ((Reduce.IllustratorContextImpl)reduceContext).pack; + } + } + } + + public static class Reduce extends PigGenericMapReduce.Reduce { + /** + * Get reducer's illustrator context + * + * @param input Input buffer as output by maps + * @param pkg package + * @return reducer's illustrator context + * @throws IOException + * @throws InterruptedException + */ + @Override + public Context getIllustratorContext(Job job, + List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException { + org.apache.hadoop.mapreduce.Reducer.Context reducerContext = new IllustrateReducerContext() + .getReducerContext(new IllustratorContextImpl(job, input, pkg)); + return reducerContext; + } + + @SuppressWarnings("unchecked") + public class IllustratorContextImpl extends ReduceContextImpl<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> { + private PigNullableWritable currentKey = null, nextKey = null; + private NullableTuple nextValue = null; + private List<NullableTuple> currentValues = null; + private Iterator<Pair<PigNullableWritable, Writable>> it; + private final ByteArrayOutputStream bos; + private final DataOutputStream dos; + private final RawComparator sortComparator, groupingComparator; + public POPackage pack = null; + private IllustratorValueIterable iterable = new IllustratorValueIterable(); + + public IllustratorContextImpl(Job job, + List<Pair<PigNullableWritable, Writable>> input, + POPackage pkg + ) throws IOException, InterruptedException { + super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()), + null, null, null, null, new IllustrateDummyReporter(), null, PigNullableWritable.class, NullableTuple.class); + bos = new ByteArrayOutputStream(); + dos = new DataOutputStream(bos); + org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf()); + sortComparator = nwJob.getSortComparator(); + groupingComparator = nwJob.getGroupingComparator(); + + Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() { + @Override + public int compare(Pair<PigNullableWritable, Writable> o1, + Pair<PigNullableWritable, Writable> o2) { + try { + o1.first.write(dos); + int l1 = bos.size(); + o2.first.write(dos); + int l2 = bos.size(); + byte[] bytes = bos.toByteArray(); + bos.reset(); + return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1); + } catch (IOException e) { + throw new RuntimeException("Serialization exception in sort:"+e.getMessage()); + } + } + } + ); + currentValues = new ArrayList<NullableTuple>(); + it = input.iterator(); + if (it.hasNext()) { + Pair<PigNullableWritable, Writable> entry = it.next(); + nextKey = entry.first; + nextValue = (NullableTuple) entry.second; + } + pack = pkg; + } + + public class IllustratorValueIterator implements ReduceContext.ValueIterator<NullableTuple> { + + private int pos = -1; + private int mark = -1; + + @Override + public void mark() throws IOException { + mark=pos-1; + if (mark<-1) + mark=-1; + } + + @Override + public void reset() throws IOException { + pos=mark; + } + + @Override + public void clearMark() throws IOException { + mark=-1; + } + + @Override + public boolean hasNext() { + return pos<currentValues.size()-1; + } + + @Override + public NullableTuple next() { + pos++; + return currentValues.get(pos); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("remove not implemented"); + } + + @Override + public void resetBackupStore() throws IOException { + pos=-1; + mark=-1; + } + + } + + protected class IllustratorValueIterable implements Iterable<NullableTuple> { + private IllustratorValueIterator iterator = new IllustratorValueIterator(); + @Override + public Iterator<NullableTuple> iterator() { + return iterator; + } + } + + @Override + public PigNullableWritable getCurrentKey() { + return currentKey; + } + + @Override + public boolean nextKey() { + if (nextKey == null) + return false; + currentKey = nextKey; + currentValues.clear(); + currentValues.add(nextValue); + nextKey = null; + for(; it.hasNext(); ) { + Pair<PigNullableWritable, Writable> entry = it.next(); + /* Why can't raw comparison be used? + byte[] bytes; + int l1, l2; + try { + currentKey.write(dos); + l1 = bos.size(); + entry.first.write(dos); + l2 = bos.size(); + bytes = bos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException("nextKey exception : "+e.getMessage()); + } + bos.reset(); + if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0) + */ + if (groupingComparator.compare(currentKey, entry.first) == 0) + { + currentValues.add((NullableTuple)entry.second); + } else { + nextKey = entry.first; + nextValue = (NullableTuple) entry.second; + break; + } + } + return true; + } + + @Override + public Iterable<NullableTuple> getValues() { + return iterable; + } + + @Override + public void write(PigNullableWritable k, Writable t) { + } + + @Override + public void progress() { + } + } + + @Override + public boolean inIllustrator(org.apache.hadoop.mapreduce.Reducer.Context context) { + return (context instanceof PigMapReduce.IllustrateReducerContext.IllustratorContext); + } + + @Override + public POPackage getPack(org.apache.hadoop.mapreduce.Reducer.Context context) { + return ((PigMapReduce.IllustrateReducerContext.IllustratorContext) context).getPack(); + } + } +} Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Wed Feb 22 09:43:41 2017 @@ -18,7 +18,6 @@ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; @@ -156,12 +155,7 @@ public class PigOutputCommitter extends for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) { if (mapCommitter.first!=null) { try { - // Use reflection, Hadoop 1.x line does not have such method - Method m = mapCommitter.first.getClass().getMethod("isRecoverySupported"); - allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery - && (Boolean)m.invoke(mapCommitter.first); - } catch (NoSuchMethodException e) { - allOutputCommitterSupportRecovery = false; + allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && mapCommitter.first.isRecoverySupported(); } catch (Exception e) { throw new RuntimeException(e); } @@ -173,12 +167,7 @@ public class PigOutputCommitter extends reduceOutputCommitters) { if (reduceCommitter.first!=null) { try { - // Use reflection, Hadoop 1.x line does not have such method - Method m = reduceCommitter.first.getClass().getMethod("isRecoverySupported"); - allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery - && (Boolean)m.invoke(reduceCommitter.first); - } catch (NoSuchMethodException e) { - allOutputCommitterSupportRecovery = false; + allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && reduceCommitter.first.isRecoverySupported(); } catch (Exception e) { throw new RuntimeException(e); } @@ -197,10 +186,7 @@ public class PigOutputCommitter extends mapCommitter.second); try { // Use reflection, Hadoop 1.x line does not have such method - Method m = mapCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class); - m.invoke(mapCommitter.first, updatedContext); - } catch (NoSuchMethodException e) { - // We are using Hadoop 1.x, ignore + mapCommitter.first.recoverTask(updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -212,11 +198,7 @@ public class PigOutputCommitter extends TaskAttemptContext updatedContext = setUpContext(context, reduceCommitter.second); try { - // Use reflection, Hadoop 1.x line does not have such method - Method m = reduceCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class); - m.invoke(reduceCommitter.first, updatedContext); - } catch (NoSuchMethodException e) { - // We are using Hadoop 1.x, ignore + reduceCommitter.first.recoverTask(updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -256,10 +238,7 @@ public class PigOutputCommitter extends mapCommitter.second); // PIG-2642 promote files before calling storeCleanup/storeSchema try { - // Use reflection, 20.2 does not have such method - Method m = mapCommitter.first.getClass().getMethod("commitJob", JobContext.class); - m.setAccessible(true); - m.invoke(mapCommitter.first, updatedContext); + mapCommitter.first.commitJob(updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -273,10 +252,7 @@ public class PigOutputCommitter extends reduceCommitter.second); // PIG-2642 promote files before calling storeCleanup/storeSchema try { - // Use reflection, 20.2 does not have such method - Method m = reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class); - m.setAccessible(true); - m.invoke(reduceCommitter.first, updatedContext); + reduceCommitter.first.commitJob(updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -293,10 +269,7 @@ public class PigOutputCommitter extends JobContext updatedContext = setUpContext(context, mapCommitter.second); try { - // Use reflection, 20.2 does not have such method - Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class); - m.setAccessible(true); - m.invoke(mapCommitter.first, updatedContext, state); + mapCommitter.first.abortJob(updatedContext, state); } catch (Exception e) { throw new IOException(e); } @@ -309,10 +282,7 @@ public class PigOutputCommitter extends JobContext updatedContext = setUpContext(context, reduceCommitter.second); try { - // Use reflection, 20.2 does not have such method - Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class); - m.setAccessible(true); - m.invoke(reduceCommitter.first, updatedContext, state); + reduceCommitter.first.abortJob(updatedContext, state); } catch (Exception e) { throw new IOException(e); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Wed Feb 22 09:43:41 2017 @@ -515,9 +515,11 @@ public class PigSplit extends InputSplit for (int i = 0; i < wrappedSplits.length; i++) { st.append("Input split["+i+"]:\n Length = "+ wrappedSplits[i].getLength()+"\n ClassName: " + wrappedSplits[i].getClass().getName() + "\n Locations:\n"); - for (String location : wrappedSplits[i].getLocations()) - st.append(" "+location+"\n"); - st.append("\n-----------------------\n"); + if (wrappedSplits[i]!=null && wrappedSplits[i].getLocations()!=null) { + for (String location : wrappedSplits[i].getLocations()) + st.append(" "+location+"\n"); + st.append("\n-----------------------\n"); + } } } catch (IOException e) { return null; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java Wed Feb 22 09:43:41 2017 @@ -26,21 +26,21 @@ public class DiscreteProbabilitySampleGe Random rGen; float[] probVec; float epsilon = 0.0001f; - + private static final Log LOG = LogFactory.getLog(DiscreteProbabilitySampleGenerator.class); - - public DiscreteProbabilitySampleGenerator(float[] probVec) { - rGen = new Random(); + + public DiscreteProbabilitySampleGenerator(long seed, float[] probVec) { + rGen = new Random(seed); float sum = 0.0f; for (float f : probVec) { sum += f; } this.probVec = probVec; - if (1-epsilon > sum || sum > 1+epsilon) { + if (1-epsilon > sum || sum > 1+epsilon) { LOG.info("Sum of probabilities should be near one: " + sum); } } - + public int getNext(){ double toss = rGen.nextDouble(); // if the uniformly random number that I generated @@ -57,13 +57,13 @@ public class DiscreteProbabilitySampleGe toss -= probVec[i]; if(toss<=0.0) return i; - } + } return lastIdx; } - + public static void main(String[] args) { float[] vec = { 0, 0.3f, 0.2f, 0, 0, 0.5f }; - DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(vec); + DiscreteProbabilitySampleGenerator gen = new DiscreteProbabilitySampleGenerator(11317, vec); CountingMap<Integer> cm = new CountingMap<Integer>(); for(int i=0;i<100;i++){ cm.put(gen.getNext(), 1); @@ -75,6 +75,6 @@ public class DiscreteProbabilitySampleGe public String toString() { return Arrays.toString(probVec); } - - + + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Wed Feb 22 09:43:41 2017 @@ -17,7 +17,6 @@ */ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -31,13 +30,13 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; -import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.DataBag; import org.apache.pig.data.InternalMap; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.io.NullableBigDecimalWritable; import org.apache.pig.impl.io.NullableBigIntegerWritable; @@ -52,7 +51,6 @@ import org.apache.pig.impl.io.NullableTe import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.io.ReadToEndLoader; -import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.Utils; public class WeightedRangePartitioner extends Partitioner<PigNullableWritable, Writable> @@ -62,7 +60,6 @@ public class WeightedRangePartitioner ex new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>(); protected PigNullableWritable[] quantiles; protected RawComparator<PigNullableWritable> comparator; - private PigContext pigContext; protected Configuration job; protected boolean inited = false; @@ -93,11 +90,6 @@ public class WeightedRangePartitioner ex @SuppressWarnings("unchecked") public void init() { weightedParts = new HashMap<PigNullableWritable, DiscreteProbabilitySampleGenerator>(); - try { - pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext")); - } catch (IOException e) { - throw new RuntimeException("Failed to deserialize pig context: ", e); - } String quantilesFile = job.get("pig.quantilesFile", ""); if (quantilesFile.length() == 0) { @@ -109,10 +101,10 @@ public class WeightedRangePartitioner ex // use local file system to get the quantilesFile Map<String, Object> quantileMap = null; Configuration conf; - if (!pigContext.getExecType().isLocal()) { - conf = ConfigurationUtil.toConfiguration(pigContext.getProperties()); - } else { + if (job.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false)) { conf = new Configuration(false); + } else { + conf = new Configuration(job); } if (job.get("fs.file.impl") != null) { conf.set("fs.file.impl", job.get("fs.file.impl")); @@ -138,11 +130,13 @@ public class WeightedRangePartitioner ex DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST); InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS); convertToArray(quantilesList); + long taskIdHashCode = job.get(MRConfiguration.TASK_ID).hashCode(); + long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL); for (Entry<Object, Object> ent : weightedPartsData.entrySet()) { Tuple key = (Tuple)ent.getKey(); // sample item which repeats float[] probVec = getProbVec((Tuple)ent.getValue()); weightedParts.put(getPigNullableWritable(key), - new DiscreteProbabilitySampleGenerator(probVec)); + new DiscreteProbabilitySampleGenerator(randomSeed, probVec)); } } // else - the quantiles file is empty - unless we have a bug, the Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Wed Feb 22 09:43:41 2017 @@ -21,14 +21,16 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; @@ -105,7 +107,7 @@ public class EndOfAllInputSetter extends public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException { endOfAllInputFlag = true; } - + @Override public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException { endOfAllInputFlag = true; @@ -122,6 +124,13 @@ public class EndOfAllInputSetter extends } } + @Override + public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{ + if (lr instanceof POBuildBloomRearrangeTez) { + endOfAllInputFlag = true; + } + super.visitLocalRearrange(lr); + } /** * @return if end of all input is present Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRPrinter.java Wed Feb 22 09:43:41 2017 @@ -27,7 +27,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter; -import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.VisitorException; /** @@ -43,7 +43,7 @@ public class MRPrinter extends MROpPlanV * @param plan MR plan to print */ public MRPrinter(PrintStream ps, MROperPlan plan) { - super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan)); + super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(plan, true)); mStream = ps; mStream.println("#--------------------------------------------------"); mStream.println("# Map Reduce Plan "); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Wed Feb 22 09:43:41 2017 @@ -441,6 +441,10 @@ public abstract class PhysicalOperator e public void reset() { } + public boolean isEndOfAllInput() { + return parentPlan.endOfAllInput; + } + /** * @return PigProgressable stored in threadlocal */ Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/Divide.java Wed Feb 22 09:43:41 2017 @@ -19,7 +19,10 @@ package org.apache.pig.backend.hadoop.ex import java.math.BigDecimal; import java.math.BigInteger; +import java.math.RoundingMode; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.pig.PigWarning; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -36,6 +39,8 @@ public class Divide extends BinaryExpres * */ private static final long serialVersionUID = 1L; + public static final short BIGDECIMAL_MINIMAL_SCALE = 6; + private static final Log LOG = LogFactory.getLog(Divide.class); public Divide(OperatorKey k) { super(k); @@ -72,12 +77,22 @@ public class Divide extends BinaryExpres case DataType.BIGINTEGER: return ((BigInteger) a).divide((BigInteger) b); case DataType.BIGDECIMAL: - return ((BigDecimal) a).divide((BigDecimal) b); + return bigDecimalDivideWithScale(a, b); default: throw new ExecException("called on unsupported Number class " + DataType.findTypeName(dataType)); } } + private Number bigDecimalDivideWithScale(Number a, Number b) { + // Using same result scaling as Hive. See Arithmetic Rules: + // https://cwiki.apache.org/confluence/download/attachments/27362075/Hive_Decimal_Precision_Scale_Support.pdf + int resultScale = Math.max(BIGDECIMAL_MINIMAL_SCALE, ((BigDecimal)a).scale() + ((BigDecimal)b).precision() + 1); + if (LOG.isDebugEnabled()) { + LOG.debug("For bigdecimal divide: using " + resultScale + " as result scale."); + } + return ((BigDecimal)a).divide((BigDecimal)b, resultScale, RoundingMode.HALF_UP); + } + /* * This method is used to invoke the appropriate method, as Java does not provide generic * dispatch for it.
