Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java Thu Nov 27 12:49:54 2014 @@ -38,7 +38,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; @@ -57,6 +56,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.builtin.SampleLoader; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.VisitorException; @@ -80,7 +80,7 @@ public class FetchOptimizer { */ public static boolean isFetchEnabled(PigContext pc) { return "true".equalsIgnoreCase( - pc.getProperties().getProperty(PigConfiguration.OPT_FETCH, "true")); + pc.getProperties().getProperty(PigConfiguration.PIG_OPT_FETCH, "true")); } /** @@ -97,14 +97,20 @@ public class FetchOptimizer { FetchablePlanVisitor fpv = new FetchablePlanVisitor(pc, pp); fpv.visit(); // Plan is fetchable only if FetchablePlanVisitor returns true AND - // limit is present in the plan. Limit is a safeguard. If the input - // is large, and there is no limit, fetch optimizer will fetch the - // entire input to the client. That can be dangerous. - boolean isFetchable = fpv.isPlanFetchable() && - PlanHelper.containsPhysicalOperator(pp, POLimit.class); - if (isFetchable) - init(pp); - return isFetchable; + // limit is present in the plan, i.e: limit is pushed up to the loader. + // Limit is a safeguard. If the input is large, and there is no limit, + // fetch optimizer will fetch the entire input to the client. That can be dangerous. + if (!fpv.isPlanFetchable()) { + return false; + } + for (POLoad load : PlanHelper.getPhysicalOperators(pp, POLoad.class)) { + if (load.getLimit() == -1) { + return false; + } + } + pc.getProperties().setProperty(PigImplConstants.CONVERTED_TO_FETCH, "true"); + init(pp); + return true; } return false; }
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Thu Nov 27 12:49:54 2014 @@ -17,6 +17,8 @@ */ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -26,8 +28,6 @@ import org.apache.hadoop.fs.Path; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.impl.util.UriUtil; -import java.io.IOException; - /** * Class that computes the size of output for file-based systems. */ @@ -43,19 +43,23 @@ public class FileBasedOutputSizeReader i */ @Override public boolean supports(POStore sto, Configuration conf) { - String storeFuncName = sto.getStoreFunc().getClass().getCanonicalName(); - // Some store functions do not support file-based output reader (e.g. - // HCatStorer), so they should be excluded. - String unsupported = conf.get( - PigStatsOutputSizeReader.OUTPUT_SIZE_READER_UNSUPPORTED); - if (unsupported != null) { - for (String s : unsupported.split(",")) { - if (s.equalsIgnoreCase(storeFuncName)) { - return false; + boolean nullOrSupportedScheme = UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto), conf); + if (nullOrSupportedScheme) { + // Some store functions that do not have scheme + // do not support file-based output reader (e.g.HCatStorer), + // so they should be excluded. + String unsupported = conf.get( + PigStatsOutputSizeReader.OUTPUT_SIZE_READER_UNSUPPORTED); + if (unsupported != null) { + String storeFuncName = sto.getStoreFunc().getClass().getCanonicalName(); + for (String s : unsupported.split(",")) { + if (s.equalsIgnoreCase(storeFuncName)) { + return false; + } } } } - return UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto), conf); + return nullOrSupportedScheme; } /** Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Thu Nov 27 12:49:54 2014 @@ -92,12 +92,27 @@ public class InputSizeReducerEstimator i return reducers; } + static long getTotalInputFileSize(Configuration conf, + List<POLoad> lds, Job job) throws IOException { + return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE); + } + /** * Get the input size for as many inputs as possible. Inputs that do not report * their size nor can pig look that up itself are excluded from this size. + * + * @param conf Configuration + * @param lds List of POLoads + * @param job Job + * @param max Maximum value of total input size that will trigger exit. Many + * times we're only interested whether the total input size is greater than + * X or not. In such case, we can exit the function early as soon as the max + * is reached. + * @return + * @throws IOException */ static long getTotalInputFileSize(Configuration conf, - List<POLoad> lds, Job job) throws IOException { + List<POLoad> lds, Job job, long max) throws IOException { long totalInputFileSize = 0; for (POLoad ld : lds) { long size = getInputSizeFromLoader(ld, job); @@ -115,8 +130,14 @@ public class InputSizeReducerEstimator i FileStatus[] status = fs.globStatus(path); if (status != null) { for (FileStatus s : status) { - totalInputFileSize += MapRedUtil.getPathLength(fs, s); + totalInputFileSize += MapRedUtil.getPathLength(fs, s, max); + if (totalInputFileSize > max) { + break; + } } + } else { + // If file is not found, we should report -1 + return -1; } } else { // If we cannot estimate size of a location, we should report -1 Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Nov 27 12:49:54 2014 @@ -17,9 +17,10 @@ */ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; +import static org.apache.pig.PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR; +import static org.apache.pig.PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR_CONSTRUCTOR_ARG_KEY; + import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -77,9 +78,9 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; @@ -163,9 +164,6 @@ public class JobControlCompiler{ public static final String END_OF_INP_IN_MAP = "pig.invoke.close.in.map"; - private static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator"; - private static final String REDUCER_ESTIMATOR_ARG_KEY = "pig.exec.reducer.estimator.arg"; - public static final String PIG_MAP_COUNTER = "pig.counters.counter_"; public static final String PIG_MAP_RANK_NAME = "pig.rank_"; public static final String PIG_MAP_SEPARATOR = "_"; @@ -447,8 +445,8 @@ public class JobControlCompiler{ return false; } - long totalInputFileSize = InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job); long inputByteMax = conf.getLong(PigConfiguration.PIG_AUTO_LOCAL_INPUT_MAXBYTES, 100*1000*1000l); + long totalInputFileSize = InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job, inputByteMax); log.info("Size of input: " + totalInputFileSize +" bytes. Small job threshold: " + inputByteMax ); if (totalInputFileSize < 0 || totalInputFileSize > inputByteMax) { return false; @@ -505,7 +503,7 @@ public class JobControlCompiler{ Path tmpLocation = null; // add settings for pig statistics - String setScriptProp = conf.get(PigConfiguration.INSERT_ENABLED, "true"); + String setScriptProp = conf.get(PigConfiguration.PIG_SCRIPT_INFO_ENABLED, "true"); if (setScriptProp.equalsIgnoreCase("true")) { MRScriptState ss = MRScriptState.get(); ss.addSettingsToConf(mro, conf); @@ -546,42 +544,6 @@ public class JobControlCompiler{ nwJob.setNumReduceTasks(0); } - for (String udf : mro.UDFs) { - if (udf.contains("GFCross")) { - Object func = pigContext.instantiateFuncFromSpec(new FuncSpec(udf)); - if (func instanceof GFCross) { - String crossKey = ((GFCross)func).getCrossKey(); - // If non GFCross has been processed yet - if (pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey)==null) { - pigContext.getProperties().setProperty(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey, - Integer.toString(nwJob.getNumReduceTasks())); - } - conf.set(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey, - (String)pigContext.getProperties().get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey)); - } - } - } - - if(lds!=null && lds.size()>0){ - for (POLoad ld : lds) { - //Store the target operators for tuples read - //from this input - List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld); - List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>(); - if(ldSucs!=null){ - for (PhysicalOperator operator2 : ldSucs) { - ldSucKeys.add(operator2.getOperatorKey()); - } - } - inpTargets.add(ldSucKeys); - inpSignatureLists.add(ld.getSignature()); - inpLimits.add(ld.getLimit()); - //Remove the POLoad from the plan - if (!pigContext.inIllustrator) - mro.mapPlan.remove(ld); - } - } - if (!pigContext.inIllustrator && ! pigContext.getExecType().isLocal()) { if (okToRunLocal(nwJob, mro, lds)) { @@ -610,6 +572,22 @@ public class JobControlCompiler{ conf.setBoolean(PigImplConstants.CONVERTED_TO_LOCAL, true); } else { log.info(BIG_JOB_LOG_MSG); + // Search to see if we have any UDF/LoadFunc/StoreFunc that need to pack things into the + // distributed cache. + List<String> cacheFiles = new ArrayList<String>(); + List<String> shipFiles = new ArrayList<String>(); + UdfCacheShipFilesVisitor mapUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.mapPlan); + mapUdfCacheFileVisitor.visit(); + cacheFiles.addAll(mapUdfCacheFileVisitor.getCacheFiles()); + shipFiles.addAll(mapUdfCacheFileVisitor.getShipFiles()); + + UdfCacheShipFilesVisitor reduceUdfCacheFileVisitor = new UdfCacheShipFilesVisitor(mro.reducePlan); + reduceUdfCacheFileVisitor.visit(); + cacheFiles.addAll(reduceUdfCacheFileVisitor.getCacheFiles()); + shipFiles.addAll(reduceUdfCacheFileVisitor.getShipFiles()); + + setupDistributedCache(pigContext, conf, cacheFiles.toArray(new String[]{}), false); + // Setup the DistributedCache for this job List<URL> allJars = new ArrayList<URL>(); @@ -619,6 +597,19 @@ public class JobControlCompiler{ } } + for (String udf : mro.UDFs) { + Class clazz = pigContext.getClassForAlias(udf); + if (clazz != null) { + String jar = JarManager.findContainingJar(clazz); + if (jar!=null) { + URL jarURL = new File(jar).toURI().toURL(); + if (!allJars.contains(jarURL)) { + allJars.add(jarURL); + } + } + } + } + for (String scriptJar : pigContext.scriptJars) { URL jar = new File(scriptJar).toURI().toURL(); if (!allJars.contains(jar)) { @@ -626,6 +617,13 @@ public class JobControlCompiler{ } } + for (String shipFile : shipFiles) { + URL jar = new File(shipFile).toURI().toURL(); + if (!allJars.contains(jar)) { + allJars.add(jar); + } + } + for (String defaultJar : JarManager.getDefaultJars()) { URL jar = new File(defaultJar).toURI().toURL(); if (!allJars.contains(jar)) { @@ -641,7 +639,6 @@ public class JobControlCompiler{ } } if (!predeployed) { - log.info("Adding jar to DistributedCache: " + jar); putJarOnClassPathThroughDistributedCache(pigContext, conf, jar); } } @@ -653,6 +650,37 @@ public class JobControlCompiler{ } } + for (String udf : mro.UDFs) { + if (udf.contains("GFCross")) { + Object func = PigContext.instantiateFuncFromSpec(new FuncSpec(udf)); + if (func instanceof GFCross) { + String crossKey = ((GFCross)func).getCrossKey(); + conf.set(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey, + Integer.toString(mro.getRequestedParallelism())); + } + } + } + + if(lds!=null && lds.size()>0){ + for (POLoad ld : lds) { + //Store the target operators for tuples read + //from this input + List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld); + List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>(); + if(ldSucs!=null){ + for (PhysicalOperator operator2 : ldSucs) { + ldSucKeys.add(operator2.getOperatorKey()); + } + } + inpTargets.add(ldSucKeys); + inpSignatureLists.add(ld.getSignature()); + inpLimits.add(ld.getLimit()); + //Remove the POLoad from the plan + if (!pigContext.inIllustrator) + mro.mapPlan.remove(ld); + } + } + if(Utils.isLocal(pigContext, conf)) { ConfigurationUtil.replaceConfigForLocalMode(conf); } @@ -779,10 +807,6 @@ public class JobControlCompiler{ // serialized setupDistributedCacheForJoin(mro, pigContext, conf); - // Search to see if we have any UDFs that need to pack things into the - // distributed cache. - setupDistributedCacheForUdfs(mro, pigContext, conf); - SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf); POPackage pack = null; @@ -1022,9 +1046,9 @@ public class JobControlCompiler{ Configuration conf = nwJob.getConfiguration(); // set various parallelism into the job conf for later analysis, PIG-2779 - conf.setInt("pig.info.reducers.default.parallel", pigContext.defaultParallel); - conf.setInt("pig.info.reducers.requested.parallel", mro.requestedParallelism); - conf.setInt("pig.info.reducers.estimated.parallel", mro.estimatedParallelism); + conf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pigContext.defaultParallel); + conf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, mro.requestedParallelism); + conf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, mro.estimatedParallelism); // this is for backward compatibility, and we encourage to use runtimeParallelism at runtime mro.requestedParallelism = jobParallelism; @@ -1080,10 +1104,10 @@ public class JobControlCompiler{ MapReduceOper mapReducerOper) throws IOException { Configuration conf = job.getConfiguration(); - PigReducerEstimator estimator = conf.get(REDUCER_ESTIMATOR_KEY) == null ? + PigReducerEstimator estimator = conf.get(PIG_EXEC_REDUCER_ESTIMATOR) == null ? new InputSizeReducerEstimator() : PigContext.instantiateObjectFromParams(conf, - REDUCER_ESTIMATOR_KEY, REDUCER_ESTIMATOR_ARG_KEY, PigReducerEstimator.class); + PIG_EXEC_REDUCER_ESTIMATOR, PIG_EXEC_REDUCER_ESTIMATOR_CONSTRUCTOR_ARG_KEY, PigReducerEstimator.class); log.info("Using reducer estimator: " + estimator.getClass().getName()); int numberOfReducers = estimator.estimateNumberOfReducers(job, mapReducerOper); @@ -1478,13 +1502,6 @@ public class JobControlCompiler{ .visit(); } - private void setupDistributedCacheForUdfs(MapReduceOper mro, - PigContext pigContext, - Configuration conf) throws IOException { - new UdfDistributedCacheVisitor(mro.mapPlan, pigContext, conf).visit(); - new UdfDistributedCacheVisitor(mro.reducePlan, pigContext, conf).visit(); - } - private static void setupDistributedCache(PigContext pigContext, Configuration conf, Properties properties, String key, @@ -1633,11 +1650,50 @@ public class JobControlCompiler{ // Turn on the symlink feature DistributedCache.createSymlink(conf); - // REGISTER always copies locally the jar file. see PigServer.registerJar() - Path pathInHDFS = shipToHDFS(pigContext, conf, url); - // and add to the DistributedCache - DistributedCache.addFileToClassPath(pathInHDFS, conf); - pigContext.addSkipJar(url.getPath()); + Path distCachePath = getExistingDistCacheFilePath(conf, url); + if (distCachePath != null) { + log.info("Jar file " + url + " already in DistributedCache as " + + distCachePath + ". Not copying to hdfs and adding again"); + // Path already in dist cache + if (!HadoopShims.isHadoopYARN()) { + // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth. + // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files + // But path may only be in 'mapred.cache.files' and not be in + // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there + DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf)); + } + } + else { + // REGISTER always copies locally the jar file. see PigServer.registerJar() + Path pathInHDFS = shipToHDFS(pigContext, conf, url); + DistributedCache.addFileToClassPath(pathInHDFS, conf, FileSystem.get(conf)); + log.info("Added jar " + url + " to DistributedCache through " + pathInHDFS); + } + + } + + private static Path getExistingDistCacheFilePath(Configuration conf, URL url) throws IOException { + URI[] cacheFileUris = DistributedCache.getCacheFiles(conf); + if (cacheFileUris != null) { + String fileName = url.getRef() == null ? FilenameUtils.getName(url.getPath()) : url.getRef(); + for (URI cacheFileUri : cacheFileUris) { + Path path = new Path(cacheFileUri); + String cacheFileName = cacheFileUri.getFragment() == null ? path.getName() : cacheFileUri.getFragment(); + // Match + // - if both filenames are same and no symlinks (or) + // - if both symlinks are same (or) + // - symlink of existing cache file is same as the name of the new file to be added. + // That would be the case when hbase-0.98.4.jar#hbase.jar is configured via Oozie + // and register hbase.jar is done in the pig script. + // If two different files are symlinked to the same name, then there is a conflict + // and hadoop itself does not guarantee which file will be symlinked to that name. + // So we are good. + if (fileName.equals(cacheFileName)) { + return path; + } + } + } + return null; } private static Path getCacheStagingDir(Configuration conf) throws IOException { @@ -1763,6 +1819,8 @@ public class JobControlCompiler{ ArrayList<String> replicatedPath = new ArrayList<String>(); FileSpec[] newReplFiles = new FileSpec[replFiles.length]; + long maxSize = Long.valueOf(pigContext.getProperties().getProperty( + PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES, "1000000000")); // the first input is not replicated long sizeOfReplicatedInputs = 0; @@ -1782,7 +1840,7 @@ public class JobControlCompiler{ Path path = new Path(replFiles[i].getFileName()); FileSystem fs = path.getFileSystem(conf); sizeOfReplicatedInputs += - MapRedUtil.getPathLength(fs, fs.getFileStatus(path)); + MapRedUtil.getPathLength(fs, fs.getFileStatus(path), maxSize); } newReplFiles[i] = new FileSpec(symlink, (replFiles[i] == null ? null : replFiles[i].getFuncSpec())); @@ -1790,9 +1848,7 @@ public class JobControlCompiler{ join.setReplFiles(newReplFiles); - String maxSize = pigContext.getProperties().getProperty( - PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES, "1000000000"); - if (sizeOfReplicatedInputs > Long.parseLong(maxSize)){ + if (sizeOfReplicatedInputs > maxSize) { throw new VisitorException("Replicated input files size: " + sizeOfReplicatedInputs + " exceeds " + PigConfiguration.PIG_JOIN_REPLICATED_MAX_BYTES + ": " + maxSize); @@ -1854,41 +1910,6 @@ public class JobControlCompiler{ } } - private static class UdfDistributedCacheVisitor extends PhyPlanVisitor { - - private PigContext pigContext = null; - private Configuration conf = null; - - public UdfDistributedCacheVisitor(PhysicalPlan plan, - PigContext pigContext, - Configuration conf) { - super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>( - plan)); - this.pigContext = pigContext; - this.conf = conf; - } - - @Override - public void visitUserFunc(POUserFunc func) throws VisitorException { - - // XXX Hadoop currently doesn't support distributed cache in local mode. - // This line will be removed after the support is added - if (Utils.isLocal(pigContext, conf)) return; - - // set up distributed cache for files indicated by the UDF - String[] files = func.getCacheFiles(); - if (files == null) return; - - try { - setupDistributedCache(pigContext, conf, files, false); - } catch (IOException e) { - String msg = "Internal error. Distributed cache could not " + - "be set up for the requested files"; - throw new VisitorException(msg, e); - } - } - } - private static class ParallelConstantVisitor extends PhyPlanVisitor { private int rp; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Nov 27 12:49:54 2014 @@ -1076,7 +1076,14 @@ public class MRCompiler extends PhyPlanV @Override public void visitPOForEach(POForEach op) throws VisitorException{ try{ - nonBlocking(op); + if (op.isMapSideOnly() && curMROp.isMapDone()) { + FileSpec fSpec = getTempFileSpec(); + MapReduceOper prevMROper = endSingleInputPlanWithStr(fSpec); + curMROp = startNew(fSpec, prevMROper); + curMROp.mapPlan.addAsLeaf(op); + } else { + nonBlocking(op); + } List<PhysicalPlan> plans = op.getInputPlans(); if(plans!=null) for (PhysicalPlan plan : plans) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Nov 27 12:49:54 2014 @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -637,10 +638,10 @@ public class MapReduceLauncher extends L pc.getProperties().getProperty( "last.input.chunksize", JoinPackager.DEFAULT_CHUNK_SIZE); - String prop = pc.getProperties().getProperty(PigConfiguration.PROP_NO_COMBINER); + String prop = pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_COMBINER); if (!pc.inIllustrator && !("true".equals(prop))) { boolean doMapAgg = - Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PROP_EXEC_MAP_PARTAGG,"false")); + Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG,"false")); CombinerOptimizer co = new CombinerOptimizer(plan, doMapAgg); co.visit(); //display the warning message(s) from the CombinerOptimizer @@ -686,7 +687,7 @@ public class MapReduceLauncher extends L fRem.visit(); boolean isMultiQuery = - Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true")); + Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PIG_OPT_MULTIQUERY, "true")); if (isMultiQuery) { // reduces the number of MROpers in the MR plan generated @@ -797,13 +798,13 @@ public class MapReduceLauncher extends L throw new ExecException(backendException); } try { - TaskReport[] mapRep = HadoopShims.getTaskReports(job, TaskType.MAP); + Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP); if (mapRep != null) { getErrorMessages(mapRep, "map", errNotDbg, pigContext); totalHadoopTimeSpent += computeTimeSpent(mapRep); mapRep = null; } - TaskReport[] redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE); + Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE); if (redRep != null) { getErrorMessages(redRep, "reduce", errNotDbg, pigContext); totalHadoopTimeSpent += computeTimeSpent(redRep); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Nov 27 12:49:54 2014 @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Set; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor; @@ -28,6 +29,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.Operator; @@ -523,23 +525,32 @@ public class MapReduceOper extends Opera } private POCounter getCounterOperation() { - PhysicalOperator operator; - Iterator<PhysicalOperator> it = this.mapPlan.getLeaves().iterator(); - - while(it.hasNext()) { - operator = it.next(); - if(operator instanceof POCounter) - return (POCounter) operator; + POCounter counter = getCounterOperation(this.mapPlan); + if (counter == null) { + counter = getCounterOperation(this.reducePlan); } + return counter; + } - it = this.reducePlan.getLeaves().iterator(); + private POCounter getCounterOperation(PhysicalPlan plan) { + PhysicalOperator operator; + Iterator<PhysicalOperator> it = plan.getLeaves().iterator(); - while(it.hasNext()) { + while (it.hasNext()) { operator = it.next(); - if(operator instanceof POCounter) + if (operator instanceof POCounter) { return (POCounter) operator; + } else if (operator instanceof POStore) { + List<PhysicalOperator> preds = plan.getPredecessors(operator); + if (preds != null) { + for (PhysicalOperator pred : preds) { + if (pred instanceof POCounter) { + return (POCounter) pred; + } + } + } + } } - return null; } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Thu Nov 27 12:49:54 2014 @@ -121,6 +121,11 @@ class MultiQueryOptimizer extends MROpPl + " uses customPartitioner, do not merge it"); continue; } + if (successor.isCounterOperation()) { + log.debug("Splittee " + successor.getOperatorKey().getId() + + " has POCounter, do not merge it"); + continue; + } if (isMapOnly(successor)) { if (isSingleLoadMapperPlan(successor.mapPlan) && isSinglePredecessor(successor)) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Nov 27 12:49:54 2014 @@ -349,6 +349,7 @@ public class PhyPlanSetter extends PhyPl @Override public void visitPreCombinerLocalRearrange( POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException { + super.visitPreCombinerLocalRearrange(preCombinerLocalRearrange); preCombinerLocalRearrange.setParentPlan(parent); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Thu Nov 27 12:49:54 2014 @@ -122,8 +122,10 @@ public class PigBytesRawComparator exten if( dataByteArraysCompare ) { rc = WritableComparator.compareBytes(b1, offset1, length1, b2, offset2, length2); } else { - // Subtract 2, one for null byte and one for index byte - rc = mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2); + // Subtract 2, one for null byte and one for index byte. Also, do not reverse the sign + // of rc when mAsc[0] is false because BinInterSedesTupleRawComparator.compare() already + // takes that into account. + return mWrappedComp.compare(b1, s1 + 1, l1 - 2, b2, s2 + 1, l2 - 2); } } else { // For sorting purposes two nulls are equal. Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Thu Nov 27 12:49:54 2014 @@ -27,7 +27,9 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.log4j.PropertyConfigurator; +import org.apache.pig.JVMReuseManager; import org.apache.pig.PigException; +import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -73,6 +75,15 @@ public class PigCombiner { PigContext pigContext = null; private volatile boolean initialized = false; + static { + JVMReuseManager.getInstance().registerForStaticDataCleanup(Combine.class); + } + + @StaticDataCleanup + public static void staticDataCleanup() { + firstTime = true; + } + /** * Configures the Reduce plan, the POPackage operator * and the reporter thread Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Thu Nov 27 12:49:54 2014 @@ -22,8 +22,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.joda.time.DateTimeZone; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -32,6 +30,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.PropertyConfigurator; +import org.apache.pig.PigConstants; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; @@ -54,6 +53,7 @@ import org.apache.pig.impl.plan.VisitorE import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.Pair; import org.apache.pig.impl.util.SpillableMemoryManager; +import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.PigStatusReporter; /** @@ -162,6 +162,7 @@ public abstract class PigGenericMapBase Configuration job = context.getConfiguration(); SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job)); + context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId())); PigMapReduce.sJobContext = context; PigMapReduce.sJobConfInternal.set(context.getConfiguration()); PigMapReduce.sJobConf = context.getConfiguration(); @@ -214,11 +215,7 @@ public abstract class PigGenericMapBase log.info("Aliases being processed per job phase (AliasName[line,offset]): " + job.get("pig.alias.location")); - String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz"); - if (dtzStr != null && dtzStr.length() > 0) { - // ensure that the internal timezone is uniformly in UTC offset style - DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null))); - } + Utils.setDefaultTimeZone(PigMapReduce.sJobConfInternal.get()); } /** Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Thu Nov 27 12:49:54 2014 @@ -30,7 +30,10 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.pig.JVMReuseManager; +import org.apache.pig.PigConstants; import org.apache.pig.PigException; +import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; @@ -57,8 +60,8 @@ import org.apache.pig.impl.util.ObjectSe import org.apache.pig.impl.util.Pair; import org.apache.pig.impl.util.SpillableMemoryManager; import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.PigStatusReporter; -import org.joda.time.DateTimeZone; /** * This class is the static Mapper & Reducer classes that @@ -100,6 +103,17 @@ public class PigGenericMapReduce { public static ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>(); + static { + JVMReuseManager.getInstance().registerForStaticDataCleanup(PigGenericMapReduce.class); + } + + @StaticDataCleanup + public static void staticDataCleanup() { + sJobContext = null; + sJobConf = null; + sJobConfInternal = new ThreadLocal<Configuration>(); + } + public static class Map extends PigMapBase { @Override @@ -306,6 +320,7 @@ public class PigGenericMapReduce { pack = getPack(context); Configuration jConf = context.getConfiguration(); SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf)); + context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId())); sJobContext = context; sJobConfInternal.set(context.getConfiguration()); sJobConf = context.getConfiguration(); @@ -347,11 +362,7 @@ public class PigGenericMapReduce { log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location")); - String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz"); - if (dtzStr != null && dtzStr.length() > 0) { - // ensure that the internal timezone is uniformly in UTC offset style - DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null))); - } + Utils.setDefaultTimeZone(PigMapReduce.sJobConfInternal.get()); } /** Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Thu Nov 27 12:49:54 2014 @@ -56,17 +56,11 @@ public final class PigHadoopLogger imple return logger; } - public void destroy() { - if (reporter != null) { - reporter.destroy(); - } - reporter = null; - } - public void setReporter(PigStatusReporter reporter) { this.reporter = reporter; } + @Override @SuppressWarnings("rawtypes") public void warn(Object o, String msg, Enum warningEnum) { String className = o.getClass().getName(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Thu Nov 27 12:49:54 2014 @@ -93,6 +93,7 @@ public class PigInputFormat extends Inpu Configuration conf = context.getConfiguration(); PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer .deserialize(conf.get("udf.import.list"))); + MapRedUtil.setupUDFContext(conf); LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf); // Pass loader signature to LoadFunc and to InputFormat through // the conf Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduceCounter.java Thu Nov 27 12:49:54 2014 @@ -60,6 +60,13 @@ public class PigMapReduceCounter { pOperator = mp.getPredecessors(pOperator).get(0); } } + + PigStatusReporter reporter = PigStatusReporter.getInstance(); + if (reporter != null) { + reporter.incrCounter( + JobControlCompiler.PIG_MAP_RANK_NAME + + context.getJobID().toString(), taskID, 0); + } } /** @@ -69,15 +76,11 @@ public class PigMapReduceCounter { public void collect(Context context, Tuple tuple) throws InterruptedException, IOException { context.write(null, tuple); - try { - PigStatusReporter reporter = PigStatusReporter.getInstance(); - if (reporter != null) { - reporter.incrCounter( - JobControlCompiler.PIG_MAP_RANK_NAME - + context.getJobID().toString(), taskID, 1); - } - } catch (Exception ex) { - log.error("Error on incrementer of PigMapCounter"); + PigStatusReporter reporter = PigStatusReporter.getInstance(); + if (reporter != null) { + reporter.incrCounter( + JobControlCompiler.PIG_MAP_RANK_NAME + + context.getJobID().toString(), taskID, 1); } } } @@ -116,6 +119,7 @@ public class PigMapReduceCounter { } this.context = context; + incrementCounter(0L); } /** @@ -127,21 +131,14 @@ public class PigMapReduceCounter { * @param increment is the value to add to the corresponding global counter. **/ public static void incrementCounter(Long increment) { - try { - PigStatusReporter reporter = PigStatusReporter.getInstance(); - if (reporter != null) { - - if(leaf instanceof POCounter){ - reporter.incrCounter( - JobControlCompiler.PIG_MAP_RANK_NAME - + context.getJobID().toString(), taskID, increment); - } - + PigStatusReporter reporter = PigStatusReporter.getInstance(); + if (reporter != null) { + if(leaf instanceof POCounter){ + reporter.incrCounter( + JobControlCompiler.PIG_MAP_RANK_NAME + + context.getJobID().toString(), taskID, increment); } - } catch (Exception ex) { - log.error("Error on incrementer of PigReduceCounter"); } - } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Thu Nov 27 12:49:54 2014 @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.web.resources.OverwriteParam; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapreduce.Job; @@ -37,7 +36,6 @@ import org.apache.pig.backend.hadoop.dat import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; -import org.apache.pig.builtin.PigStorage; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.util.ObjectSerializer; @@ -50,19 +48,19 @@ import org.apache.pig.impl.util.ObjectSe */ @SuppressWarnings("unchecked") public class PigOutputFormat extends OutputFormat<WritableComparable, Tuple> { - + private enum Mode { SINGLE_STORE, MULTI_STORE}; - + /** the temporary directory for the multi store */ public static final String PIG_MAPRED_OUTPUT_DIR = "pig.mapred.output.dir"; /** the relative path that can be used to build a temporary * place to store the output from a number of map-reduce tasks*/ public static final String PIG_TMP_PATH = "pig.tmp.path"; - - List<POStore> reduceStores = null; - List<POStore> mapStores = null; - Configuration currentConf = null; - + + protected List<POStore> reduceStores = null; + protected List<POStore> mapStores = null; + protected Configuration currentConf = null; + @Override public RecordWriter<WritableComparable, Tuple> getRecordWriter(TaskAttemptContext taskattemptcontext) throws IOException, InterruptedException { @@ -97,27 +95,27 @@ public class PigOutputFormat extends Out @SuppressWarnings("unchecked") static public class PigRecordWriter extends RecordWriter<WritableComparable, Tuple> { - + /** * the actual RecordWriter */ private RecordWriter wrappedWriter; - + /** * the StoreFunc for the single store */ private StoreFuncInterface sFunc; - + /** * Single Query or multi query */ private Mode mode; - - public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface sFunc, + + public PigRecordWriter(RecordWriter wrappedWriter, StoreFuncInterface sFunc, Mode mode) - throws IOException { + throws IOException { this.mode = mode; - + if(mode == Mode.SINGLE_STORE) { this.wrappedWriter = wrappedWriter; this.sFunc = sFunc; @@ -128,7 +126,7 @@ public class PigOutputFormat extends Out /** * We only care about the values, so we are going to skip the keys when * we write. - * + * * @see org.apache.hadoop.mapreduce.RecordWriter#write(Object, Object) */ @Override @@ -142,7 +140,7 @@ public class PigOutputFormat extends Out } @Override - public void close(TaskAttemptContext taskattemptcontext) throws + public void close(TaskAttemptContext taskattemptcontext) throws IOException, InterruptedException { if(mode == Mode.SINGLE_STORE) { wrappedWriter.close(taskattemptcontext); @@ -150,24 +148,24 @@ public class PigOutputFormat extends Out } } - + /** * Before delegating calls to underlying OutputFormat or OutputCommitter * Pig needs to ensure the Configuration in the JobContext contains - * the output location and StoreFunc - * for the specific store - so set these up in the context for this specific + * the output location and StoreFunc + * for the specific store - so set these up in the context for this specific * store * @param jobContext the {@link JobContext} * @param store the POStore * @throws IOException on failure */ - public static void setLocation(JobContext jobContext, POStore store) throws + public static void setLocation(JobContext jobContext, POStore store) throws IOException { Job storeJob = new Job(jobContext.getConfiguration()); StoreFuncInterface storeFunc = store.getStoreFunc(); String outputLocation = store.getSFile().getFileName(); storeFunc.setStoreLocation(outputLocation, storeJob); - + // the setStoreLocation() method would indicate to the StoreFunc // to set the output location for its underlying OutputFormat. // Typically OutputFormat's store the output location in the @@ -176,7 +174,7 @@ public class PigOutputFormat extends Out // OutputFormat might have set) and merge it with the Configuration // we started with so that when this method returns the Configuration // supplied as input has the updates. - ConfigurationUtil.mergeConf(jobContext.getConfiguration(), + ConfigurationUtil.mergeConf(jobContext.getConfiguration(), storeJob.getConfiguration()); } @@ -187,20 +185,20 @@ public class PigOutputFormat extends Out checkOutputSpecsHelper(reduceStores, jobcontext); } - private void checkOutputSpecsHelper(List<POStore> stores, JobContext + private void checkOutputSpecsHelper(List<POStore> stores, JobContext jobcontext) throws IOException, InterruptedException { for (POStore store : stores) { // make a copy of the original JobContext so that - // each OutputFormat get a different copy + // each OutputFormat get a different copy JobContext jobContextCopy = HadoopShims.createJobContext( jobcontext.getConfiguration(), jobcontext.getJobID()); - + // set output location PigOutputFormat.setLocation(jobContextCopy, store); - + StoreFuncInterface sFunc = store.getStoreFunc(); OutputFormat of = sFunc.getOutputFormat(); - + // The above call should have update the conf in the JobContext // to have the output location - now call checkOutputSpecs() try { @@ -224,23 +222,22 @@ public class PigOutputFormat extends Out * @param currentConf2 * @param storeLookupKey * @return - * @throws IOException + * @throws IOException */ - private List<POStore> getStores(Configuration conf, String storeLookupKey) + private List<POStore> getStores(Configuration conf, String storeLookupKey) throws IOException { return (List<POStore>)ObjectSerializer.deserialize( conf.get(storeLookupKey)); } - - - private void setupUdfEnvAndStores(JobContext jobcontext) + + protected void setupUdfEnvAndStores(JobContext jobcontext) throws IOException{ Configuration newConf = jobcontext.getConfiguration(); - - // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside + + // We setup UDFContext so in StoreFunc.getOutputFormat, which is called inside // construct of PigOutputCommitter, can make use of it MapRedUtil.setupUDFContext(newConf); - + // if there is a udf in the plan we would need to know the import // path so we can instantiate the udf. This is required because // we will be deserializing the POStores out of the plan in the next @@ -261,13 +258,13 @@ public class PigOutputFormat extends Out // config properties have changed (eg. creating stores). currentConf = new Configuration(newConf); } - + /** * Check if given property prop is same in configurations conf1, conf2 * @param prop * @param conf1 * @param conf2 - * @return true if both are equal + * @return true if both are equal */ private boolean isConfPropEqual(String prop, Configuration conf1, Configuration conf2) { @@ -283,10 +280,10 @@ public class PigOutputFormat extends Out } @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext + public OutputCommitter getOutputCommitter(TaskAttemptContext taskattemptcontext) throws IOException, InterruptedException { setupUdfEnvAndStores(taskattemptcontext); - + // we return an instance of PigOutputCommitter to Hadoop - this instance // will wrap the real OutputCommitter(s) belonging to the store(s) return new PigOutputCommitter(taskattemptcontext, Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Thu Nov 27 12:49:54 2014 @@ -17,8 +17,8 @@ */ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; -import static org.apache.pig.PigConfiguration.TIME_UDFS; -import static org.apache.pig.PigConfiguration.TIME_UDFS_FREQUENCY; +import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE; +import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE_FREQUENCY; import static org.apache.pig.PigConstants.TIME_UDFS_ELAPSED_TIME_COUNTER; import java.io.IOException; @@ -119,10 +119,10 @@ public class PigRecordReader extends Rec idx = 0; this.limit = limit; initNextRecordReader(); - doTiming = inputSpecificConf.getBoolean(TIME_UDFS, false); + doTiming = inputSpecificConf.getBoolean(PIG_UDF_PROFILE, false); if (doTiming) { counterGroup = loadFunc.toString(); - timingFrequency = inputSpecificConf.getLong(TIME_UDFS_FREQUENCY, 100L); + timingFrequency = inputSpecificConf.getLong(PIG_UDF_PROFILE_FREQUENCY, 100L); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java Thu Nov 27 12:49:54 2014 @@ -31,6 +31,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; +import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.DataBag; @@ -109,7 +110,7 @@ public class WeightedRangePartitioner ex Map<String, Object> quantileMap = null; Configuration conf; if (!pigContext.getExecType().isLocal()) { - conf = new Configuration(true); + conf = ConfigurationUtil.toConfiguration(pigContext.getProperties()); } else { conf = new Configuration(false); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Thu Nov 27 12:49:54 2014 @@ -24,6 +24,8 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.JVMReuseManager; +import org.apache.pig.StaticDataCleanup; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; @@ -100,7 +102,7 @@ public abstract class PhysicalOperator e // Will be used by operators to report status or transmit heartbeat // Should be set by the backends to appropriate implementations that // wrap their own version of a reporter. - public static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>(); + protected static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>(); // Will be used by operators to aggregate warning messages // Should be set by the backends to appropriate implementations that @@ -120,6 +122,10 @@ public abstract class PhysicalOperator e private List<OriginalLocation> originalLocations = new ArrayList<OriginalLocation>(); + static { + JVMReuseManager.getInstance().registerForStaticDataCleanup(PhysicalOperator.class); + } + public PhysicalOperator(OperatorKey k) { this(k, -1, null); } @@ -402,9 +408,9 @@ public abstract class PhysicalOperator e } public Result getNextDataBag() throws ExecException { - Result ret = null; + Result val = new Result(); DataBag tmpBag = BagFactory.getInstance().newDefaultBag(); - for (ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) { + for (Result ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) { if (ret.returnStatus == POStatus.STATUS_ERR) { return ret; } else if (ret.returnStatus == POStatus.STATUS_NULL) { @@ -413,9 +419,9 @@ public abstract class PhysicalOperator e tmpBag.add((Tuple) ret.result); } } - ret.result = tmpBag; - ret.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK; - return ret; + val.result = tmpBag; + val.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK; + return val; } public Result getNextBigInteger() throws ExecException { @@ -451,6 +457,11 @@ public abstract class PhysicalOperator e PhysicalOperator.reporter.set(reporter); } + @StaticDataCleanup + public static void staticDataCleanup() { + reporter = new ThreadLocal<PigProgressable>(); + } + /** * Make a deep copy of this operator. This function is blank, however, * we should leave a place holder so that the subclasses can clone Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Thu Nov 27 12:49:54 2014 @@ -18,10 +18,10 @@ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators; -import static org.apache.pig.PigConfiguration.TIME_UDFS; -import static org.apache.pig.PigConfiguration.TIME_UDFS_FREQUENCY; -import static org.apache.pig.PigConstants.TIME_UDFS_INVOCATION_COUNTER; +import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE; +import static org.apache.pig.PigConfiguration.PIG_UDF_PROFILE_FREQUENCY; import static org.apache.pig.PigConstants.TIME_UDFS_ELAPSED_TIME_COUNTER; +import static org.apache.pig.PigConstants.TIME_UDFS_INVOCATION_COUNTER; import java.io.IOException; import java.io.ObjectInputStream; @@ -37,7 +37,6 @@ import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.PigException; -import org.apache.pig.PigWarning; import org.apache.pig.TerminatingAccumulator; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -67,7 +66,8 @@ public class POUserFunc extends Expressi private transient String counterGroup; private transient EvalFunc func; - private transient String[] cacheFiles = null; + private transient List<String> cacheFiles = null; + private transient List<String> shipFiles = null; FuncSpec funcSpec; FuncSpec origFSpec; @@ -157,10 +157,10 @@ public class POUserFunc extends Expressi func.setPigLogger(pigLogger); Configuration jobConf = UDFContext.getUDFContext().getJobConf(); if (jobConf != null) { - doTiming = jobConf.getBoolean(TIME_UDFS, false); + doTiming = jobConf.getBoolean(PIG_UDF_PROFILE, false); if (doTiming) { counterGroup = funcSpec.toString(); - timingFrequency = jobConf.getLong(TIME_UDFS_FREQUENCY, 100L); + timingFrequency = jobConf.getLong(PIG_UDF_PROFILE_FREQUENCY, 100L); } } // We initialize here instead of instantiateFunc because this is called @@ -280,27 +280,6 @@ public class POUserFunc extends Expressi } try { if(result.returnStatus == POStatus.STATUS_OK) { - Tuple t = (Tuple) result.result; - - // For backward compatibility, we short-circuit tuples whose - // size is 1 and field is null. (See PIG-3679) - if (t.size() == 1 && t.isNull(0)) { - pigLogger.warn(this, "All the input values are null, skipping the invocation of UDF", - PigWarning.SKIP_UDF_CALL_FOR_NULL); - Schema outputSchema = func.outputSchema(func.getInputSchema()); - // If the output schema is tuple (i.e. multiple fields are - // to be returned), we return a tuple where every field is - // null. - if (outputSchema != null && outputSchema.getField(0).type == DataType.TUPLE) { - result.result = tf.newTuple(outputSchema.getField(0).schema.size()); - // Otherwise, we simply return null since it can be cast to - // any data type. - } else { - result.result = null; - } - return result; - } - if (isAccumulative()) { if (isAccumStarted()) { if (!haveCheckedIfTerminatingAccumulator) { @@ -554,20 +533,28 @@ public class POUserFunc extends Expressi public FuncSpec getFuncSpec() { return funcSpec; } - + public void setFuncSpec(FuncSpec funcSpec) { this.funcSpec = funcSpec; instantiateFunc(funcSpec); } - public String[] getCacheFiles() { + public List<String> getCacheFiles() { return cacheFiles; } - public void setCacheFiles(String[] cf) { + public void setCacheFiles(List<String> cf) { cacheFiles = cf; } + public List<String> getShipFiles() { + return shipFiles; + } + + public void setShipFiles(List<String> sf) { + shipFiles = sf; + } + public boolean combinable() { return (func instanceof Algebraic); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Nov 27 12:49:54 2014 @@ -347,7 +347,12 @@ public class PhyPlanVisitor extends Plan */ public void visitPreCombinerLocalRearrange( POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException { - // TODO Auto-generated method stub + List<PhysicalPlan> inpPlans = preCombinerLocalRearrange.getPlans(); + for (PhysicalPlan plan : inpPlans) { + pushWalker(mCurrentWalker.spawnChildWalker(plan)); + visit(); + popWalker(); + } } public void visitPartialAgg(POPartialAgg poPartialAgg) throws VisitorException { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Thu Nov 27 12:49:54 2014 @@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex import java.util.Arrays; import java.util.Map; +import org.apache.pig.PigConfiguration; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -48,6 +49,9 @@ public class CombinerPackager extends Pa private Map<Integer, Integer> keyLookup; private int numBags; + + private transient boolean initialized; + private transient boolean useDefaultBag; /** * A new POPostCombinePackage will be constructed as a near clone of the @@ -91,15 +95,16 @@ public class CombinerPackager extends Pa } private DataBag createDataBag(int numBags) { - String bagType = null; - if (PigMapReduce.sJobConfInternal.get() != null) { - bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type"); - } - - if (bagType != null && bagType.equalsIgnoreCase("default")) { - return new NonSpillableDataBag(); + if (!initialized) { + initialized = true; + if (PigMapReduce.sJobConfInternal.get() != null) { + String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_TYPE); + if (bagType != null && bagType.equalsIgnoreCase("default")) { + useDefaultBag = true; + } + } } - return new InternalCachedBag(numBags); + return useDefaultBag ? new NonSpillableDataBag() : new InternalCachedBag(numBags); } @Override Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java Thu Nov 27 12:49:54 2014 @@ -242,9 +242,15 @@ public class MultiQueryPackager extends @Override public Tuple getValueTuple(PigNullableWritable keyWritable, - NullableTuple ntup, int index) throws ExecException { + NullableTuple ntup, int origIndex) throws ExecException { this.keyWritable = keyWritable; - return packagers.get(((int) index) & idxPart).getValueTuple( - keyWritable, ntup, index); + int index = origIndex & idxPart; + PigNullableWritable newKey = keyWritable; + if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) { + Tuple tup = (Tuple)this.keyWritable.getValueAsPigType(); + newKey = HDataType.getWritableComparableTypes(tup.get(0), packagers.get(index).getKeyType()); + newKey.setIndex((byte)origIndex); + } + return packagers.get(index).getValueTuple(newKey, ntup, origIndex); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Thu Nov 27 12:49:54 2014 @@ -19,8 +19,8 @@ package org.apache.pig.backend.hadoop.ex import java.util.ArrayList; import java.util.List; -import java.util.Iterator; +import org.apache.pig.PigConfiguration; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -38,9 +38,6 @@ import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; -import org.apache.pig.pen.util.ExampleTuple; -import org.apache.pig.pen.util.LineageTracer; -import org.apache.pig.impl.util.IdentityHashSet; /** * The collected group operator is a special operator used when users give @@ -71,7 +68,7 @@ public class POCollectedGroup extends Ph private Object prevKey = null; - private boolean useDefaultBag = false; + private transient boolean useDefaultBag; public POCollectedGroup(OperatorKey k) { this(k, -1, null); @@ -127,18 +124,14 @@ public class POCollectedGroup extends Ph @Override public Result getNextTuple() throws ExecException { - // Since the output is buffered, we need to flush the last - // set of records when the close method is called by mapper. - if (this.parentPlan.endOfAllInput) { - return getStreamCloseResult(); - } - Result inp = null; Result res = null; while (true) { inp = processInput(); if (inp.returnStatus == POStatus.STATUS_EOP) { + // Since the output is buffered, we need to flush the last + // set of records when the close method is called by mapper. if (this.parentPlan.endOfAllInput) { return getStreamCloseResult(); } else { @@ -172,7 +165,7 @@ public class POCollectedGroup extends Ph if (prevKey == null && outputBag == null) { if (PigMapReduce.sJobConfInternal.get() != null) { - String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type"); + String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_TYPE); if (bagType != null && bagType.equalsIgnoreCase("default")) { useDefaultBag = true; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Thu Nov 27 12:49:54 2014 @@ -23,6 +23,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigConfiguration; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -42,15 +43,18 @@ import org.apache.pig.impl.plan.VisitorE * Find the distinct set of tuples in a bag. * This is a blocking operator. All the input is put in the hashset implemented * in DistinctDataBag which also provides the other DataBag interfaces. - * - * + * + * */ public class PODistinct extends PhysicalOperator implements Cloneable { private static final Log log = LogFactory.getLog(PODistinct.class); private static final long serialVersionUID = 1L; private boolean inputsAccumulated = false; private DataBag distinctBag = null; - transient Iterator<Tuple> it; + + private transient boolean initialized; + private transient boolean useDefaultBag; + private transient Iterator<Tuple> it; // PIG-3385: Since GlobalRearrange is not used by PODistinct, passing the // custom partioner through here @@ -87,17 +91,19 @@ public class PODistinct extends Physical @Override public Result getNextTuple() throws ExecException { if (!inputsAccumulated) { - // by default, we create InternalSortedBag, unless user configures + // by default, we create InternalDistinctBag, unless user configures // explicitly to use old bag - String bagType = null; - if (PigMapReduce.sJobConfInternal.get() != null) { - bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.distinct.type"); - } - if (bagType != null && bagType.equalsIgnoreCase("default")) { - distinctBag = BagFactory.getInstance().newDistinctBag(); - } else { - distinctBag = new InternalDistinctBag(3); - } + if (!initialized) { + initialized = true; + if (PigMapReduce.sJobConfInternal.get() != null) { + String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_DISTINCT_TYPE); + if (bagType != null && bagType.equalsIgnoreCase("default")) { + useDefaultBag = true; + } + } + } + distinctBag = useDefaultBag ? BagFactory.getInstance().newDistinctBag() + : new InternalDistinctBag(3); Result in = processInput(); while (in.returnStatus != POStatus.STATUS_EOP) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Thu Nov 27 12:49:54 2014 @@ -92,6 +92,10 @@ public class POForEach extends PhysicalO protected Tuple inpTuple; + // Indicate the foreach statement can only in map side + // Currently only used in MR cross (See PIG-4175) + protected boolean mapSideOnly = false; + private Schema schema; public POForEach(OperatorKey k) { @@ -274,8 +278,9 @@ public class POForEach extends PhysicalO throw new ExecException(e); } }else{ - inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0); - // buffer.clear(); + if (buffer instanceof POPackage.POPackageTupleBuffer) { + inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0); + } setAccumEnd(); } @@ -293,7 +298,7 @@ public class POForEach extends PhysicalO break; } } - + buffer.clear(); } else { res = processPlan(); } @@ -786,4 +791,11 @@ public class POForEach extends PhysicalO return planLeafOps; } + public void setMapSideOnly(boolean mapSideOnly) { + this.mapSideOnly = mapSideOnly; + } + + public boolean isMapSideOnly() { + return mapSideOnly; + } }
