Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml (original) +++ pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml Fri Feb 24 03:34:37 2017 @@ -294,144 +294,6 @@ team_parkyearslist = FOREACH (GROUP team </section> </section> -<section id="bagtotuple"> - <title>BagToTuple</title> - <p>Un-nests the elements of a bag into a tuple.</p> - - <section> - <title>Syntax</title> - <table> - <tr> - <td> - <p>BagToTuple(expression)</p> - </td> - </tr> - </table></section> - - <section> - <title>Terms</title> - <table> - <tr> - <td> - <p>expression</p> - </td> - <td> - <p>An expression with data type bag.</p> - </td> - </tr> - </table> - </section> - - <section> - <title>Usage</title> - <p>BagToTuple creates a tuple from the elements of a bag. It removes only - the first level of nesting; it does not recursively un-nest nested bags. - Unlike FLATTEN, BagToTuple will not generate multiple output records per - input record. - </p> - </section> - <section> - <title>Examples</title> - <p>In this example, a bag containing tuples with one field is converted to a tuple.</p> -<source> -A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:chararray)}); - -DUMP A; -({('a'),('b'),('c')}) -({('d'),('e'),('f')}) - -X = FOREACH A GENERATE BagToTuple(B1); - -DUMP X; -(('a','b','c')) -(('d','e','f')) -</source> - <p>In this example, a bag containing tuples with two fields is converted to a tuple.</p> -<source> -A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:int,f2:int)}); - -DUMP A; -({(4,1),(7,8),(4,9)}) -({(5,8),(4,3),(3,8)}) - -X = FOREACH A GENERATE BagToTuple(B1); - -DUMP X; -((4,1,7,8,4,9)) -((5,8,4,3,3,8)) -</source> - </section> -</section> - -<section id="bagtotuple"> - <title>BagToTuple</title> - <p>Un-nests the elements of a bag into a tuple.</p> - - <section> - <title>Syntax</title> - <table> - <tr> - <td> - <p>BagToTuple(expression)</p> - </td> - </tr> - </table></section> - - <section> - <title>Terms</title> - <table> - <tr> - <td> - <p>expression</p> - </td> - <td> - <p>An expression with data type bag.</p> - </td> - </tr> - </table> - </section> - - <section> - <title>Usage</title> - <p>BagToTuple creates a tuple from the elements of a bag. It removes only - the first level of nesting; it does not recursively un-nest nested bags. - Unlike FLATTEN, BagToTuple will not generate multiple output records per - input record. - </p> - </section> - <section> - <title>Examples</title> - <p>In this example, a bag containing tuples with one field is converted to a tuple.</p> -<source> -A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:chararray)}); - -DUMP A; -({('a'),('b'),('c')}) -({('d'),('e'),('f')}) - -X = FOREACH A GENERATE BagToTuple(B1); - -DUMP X; -(('a','b','c')) -(('d','e','f')) -</source> - <p>In this example, a bag containing tuples with two fields is converted to a tuple.</p> -<source> -A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:int,f2:int)}); - -DUMP A; -({(4,1),(7,8),(4,9)}) -({(5,8),(4,3),(3,8)}) - -X = FOREACH A GENERATE BagToTuple(B1); - -DUMP X; -((4,1,7,8,4,9)) -((5,8,4,3,3,8)) -</source> - </section> -</section> - <section id="bloom"> <title>Bloom</title> <p>Bloom filters are a common way to select a limited set of records before @@ -701,80 +563,7 @@ DUMP X; </tr> </table> </section></section> - - -<!-- ++++++++++++++++++++++++++++++++++++++++++++++ --> - <section id="in"> - <title>IN</title> - <p>IN operator allows you to easily test if an expression matches any value in a list of values. It is used to reduce the need for multiple OR conditions.</p> - - <section> - <title>Syntax</title> - <table> - <tr> - <td> - <p>IN (expression)</p> - </td> - </tr> - </table></section> - - <section> - <title>Terms</title> - <table> - <tr> - <td> - <p>expression</p> - </td> - <td> - <p>An expression with data types chararray, int, long, float, double, bigdecimal, biginteger or bytearray.</p> - </td> - </tr> - </table></section> - - <section> - <title>Usage</title> - <p>IN operator allows you to easily test if an expression matches any value in a list of values. It is used to help reduce the need for multiple OR conditions.</p> - </section> - - <section> - <title>Example</title> - <p>In this example we filter out ID 4 and 6.</p> -<source> -A = load 'data' using PigStorage(',') AS (id:int, first:chararray, last:chararray, gender:chararray); - -DUMP A; -(1,Christine,Romero,Female) -(2,Sara,Hansen,Female) -(3,Albert,Rogers,Male) -(4,Kimberly,Morrison,Female) -(5,Eugene,Baker,Male) -(6,Ann,Alexander,Female) -(7,Kathleen,Reed,Female) -(8,Todd,Scott,Male) -(9,Sharon,Mccoy,Female) -(10,Evelyn,Rice,Female) - -X = FILTER A BY id IN (4, 6); -DUMP X; -(4,Kimberly,Morrison,Female) -(6,Ann,Alexander,Female) -</source> - </section> - -<p>In this example, we're passing a BigInteger and using NOT operator, thereby negating the passed list of fields in the IN clause</p> -<source> -A = load 'data' using PigStorage(',') AS (id:biginteger, first:chararray, last:chararray, gender:chararray); -X = FILTER A BY NOT id IN (1, 3, 5, 7, 9); -DUMP X; - -(2,Sara,Hansen,Female) -(4,Kimberly,Morrison,Female) -(6,Ann,Alexander,Female) -(8,Todd,Scott,Male) -(10,Evelyn,Rice,Female) -</source> -</section> - + <!-- ++++++++++++++++++++++++++++++++++++++++++++++ --> <section id="count-star"> <title>COUNT_STAR</title> @@ -1588,80 +1377,7 @@ DUMP X; </tr> </table> </section></section> - - -<!-- ++++++++++++++++++++++++++++++++++++++++++++++ --> - <section id="in"> - <title>IN</title> - <p>IN operator allows you to easily test if an expression matches any value in a list of values. It is used to reduce the need for multiple OR conditions.</p> - - <section> - <title>Syntax</title> - <table> - <tr> - <td> - <p>IN (expression)</p> - </td> - </tr> - </table></section> - - <section> - <title>Terms</title> - <table> - <tr> - <td> - <p>expression</p> - </td> - <td> - <p>An expression with data types chararray, int, long, float, double, bigdecimal, biginteger or bytearray.</p> - </td> - </tr> - </table></section> - - <section> - <title>Usage</title> - <p>IN operator allows you to easily test if an expression matches any value in a list of values. It is used to help reduce the need for multiple OR conditions.</p> - </section> - - <section> - <title>Example</title> - <p>In this example we filter out ID 4 and 6.</p> -<source> -A = load 'data' using PigStorage(',') AS (id:int, first:chararray, last:chararray, gender:chararray); - -DUMP A; -(1,Christine,Romero,Female) -(2,Sara,Hansen,Female) -(3,Albert,Rogers,Male) -(4,Kimberly,Morrison,Female) -(5,Eugene,Baker,Male) -(6,Ann,Alexander,Female) -(7,Kathleen,Reed,Female) -(8,Todd,Scott,Male) -(9,Sharon,Mccoy,Female) -(10,Evelyn,Rice,Female) - -X = FILTER A BY id IN (4, 6); -DUMP X; -(4,Kimberly,Morrison,Female) -(6,Ann,Alexander,Female) -</source> - </section> - -<p>In this example, we're passing a BigInteger and using NOT operator, thereby negating the passed list of fields in the IN clause</p> -<source> -A = load 'data' using PigStorage(',') AS (id:biginteger, first:chararray, last:chararray, gender:chararray); -X = FILTER A BY NOT id IN (1, 3, 5, 7, 9); -DUMP X; - -(2,Sara,Hansen,Female) -(4,Kimberly,Morrison,Female) -(6,Ann,Alexander,Female) -(8,Todd,Scott,Male) -(10,Evelyn,Rice,Female) -</source> -</section> - + <!-- ++++++++++++++++++++++++++++++++++++++++++++++ --> <section id="tokenize"> <title>TOKENIZE</title> @@ -1726,7 +1442,7 @@ DUMP X; <source> {code} A = LOAD 'data' AS (f1:chararray); -B = FOREACH A GENERATE TOKENIZE (f1,'||'); +B = FOREACH A TOKENIZE (f1,'||'); DUMP B; {code} </source>
Modified: pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java (original) +++ pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java Fri Feb 24 03:34:37 2017 @@ -34,10 +34,10 @@ public class CounterBasedErrorHandler im public CounterBasedErrorHandler() { Configuration conf = UDFContext.getUDFContext().getJobConf(); - this.minErrors = conf.getLong(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS, + this.minErrors = conf.getLong(PigConfiguration.PIG_ERRORS_MIN_RECORDS, 0); this.errorThreshold = conf.getFloat( - PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT, 0.0f); + PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT, 0.0f); } @Override Modified: pig/branches/spark/src/org/apache/pig/EvalFunc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/EvalFunc.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/EvalFunc.java (original) +++ pig/branches/spark/src/org/apache/pig/EvalFunc.java Fri Feb 24 03:34:37 2017 @@ -369,17 +369,4 @@ public abstract class EvalFunc<T> { public void setEndOfAllInput(boolean endOfAllInput) { } - - /** - * This will be called on both the front end and the back - * end during execution. - * @return the {@link LoadCaster} associated with this eval. Returning null - * indicates that casts from bytearray will pick the one associated with the - * parameters when they all come from the same loadcaster type. - * @throws IOException if there is an exception during LoadCaster - */ - public LoadCaster getLoadCaster() throws IOException { - return null; - } - } Modified: pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java (original) +++ pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java Fri Feb 24 03:34:37 2017 @@ -22,7 +22,6 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; @@ -48,7 +47,6 @@ public class JVMReuseImpl { PigGenericMapReduce.staticDataCleanup(); PigStatusReporter.staticDataCleanup(); PigCombiner.Combine.staticDataCleanup(); - DistinctCombiner.Combine.staticDataCleanup(); String className = null; String msg = null; Modified: pig/branches/spark/src/org/apache/pig/LoadFunc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/LoadFunc.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/LoadFunc.java (original) +++ pig/branches/spark/src/org/apache/pig/LoadFunc.java Fri Feb 24 03:34:37 2017 @@ -108,7 +108,7 @@ public abstract class LoadFunc { public abstract InputFormat getInputFormat() throws IOException; /** - * This will be called on both the front end and the back + * This will be called on the front end during planning and not on the back * end during execution. * @return the {@link LoadCaster} associated with this loader. Returning null * indicates that casts from byte array are not supported for this loader. Modified: pig/branches/spark/src/org/apache/pig/Main.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/Main.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/Main.java (original) +++ pig/branches/spark/src/org/apache/pig/Main.java Fri Feb 24 03:34:37 2017 @@ -27,6 +27,7 @@ import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.io.Reader; import java.io.StringReader; import java.net.URL; @@ -44,8 +45,9 @@ import java.util.jar.Attributes; import java.util.jar.JarFile; import java.util.jar.Manifest; -import jline.console.ConsoleReader; -import jline.console.history.FileHistory; +import jline.ConsoleReader; +import jline.ConsoleReaderInputStream; +import jline.History; import org.antlr.runtime.RecognitionException; import org.apache.commons.logging.Log; @@ -57,7 +59,6 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.apache.pig.PigRunner.ReturnCode; -import org.apache.pig.backend.BackendException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.classification.InterfaceAudience; @@ -75,7 +76,6 @@ import org.apache.pig.parser.DryRunGrunt import org.apache.pig.scripting.ScriptEngine; import org.apache.pig.scripting.ScriptEngine.SupportedScriptLang; import org.apache.pig.tools.cmdline.CmdLineParser; -import org.apache.pig.tools.grunt.ConsoleReaderInputStream; import org.apache.pig.tools.grunt.Grunt; import org.apache.pig.tools.pigstats.PigProgressNotificationListener; import org.apache.pig.tools.pigstats.PigStats; @@ -100,12 +100,13 @@ import com.google.common.io.Closeables; public class Main { static { - Utils.addShutdownHookWithPriority(new Runnable() { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override public void run() { FileLocalizer.deleteTempResourceFiles(); } - }, PigImplConstants.SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY); + }); } private final static Log log = LogFactory.getLog(Main.class); @@ -476,7 +477,7 @@ public class Main { } - logFileName = validateLogFile(logFileName, localFileRet.file); + logFileName = validateLogFile(logFileName, file); pigContext.getProperties().setProperty("pig.logfile", (logFileName == null? "": logFileName)); // Set job name based on name of the script @@ -487,7 +488,7 @@ public class Main { new File(substFile).deleteOnExit(); } - scriptState.setScript(localFileRet.file); + scriptState.setScript(new File(file)); grunt = new Grunt(pin, pigContext); gruntCalled = true; @@ -550,13 +551,12 @@ public class Main { } // Interactive mode = ExecMode.SHELL; - //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available - ConsoleReader reader = new ConsoleReader(Utils.getCompositeStream(System.in, properties), System.out); - reader.setExpandEvents(false); - reader.setPrompt("grunt> "); + //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available + ConsoleReader reader = new ConsoleReader(Utils.getCompositeStream(System.in, properties), new OutputStreamWriter(System.out)); + reader.setDefaultPrompt("grunt> "); final String HISTORYFILE = ".pig_history"; String historyFile = System.getProperty("user.home") + File.separator + HISTORYFILE; - reader.setHistory(new FileHistory(new File(historyFile))); + reader.setHistory(new History(new File(historyFile))); ConsoleReaderInputStream inputStream = new ConsoleReaderInputStream(reader); grunt = new Grunt(new BufferedReader(new InputStreamReader(inputStream)), pigContext); grunt.setConsoleReader(reader); @@ -605,7 +605,7 @@ public class Main { return ReturnCode.SUCCESS; } - logFileName = validateLogFile(logFileName, localFileRet.file); + logFileName = validateLogFile(logFileName, remainders[0]); pigContext.getProperties().setProperty("pig.logfile", (logFileName == null? "": logFileName)); if (!debug) { @@ -660,7 +660,6 @@ public class Main { if(!gruntCalled) { LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched"); } - killRunningJobsIfInterrupted(e, pigContext); } catch (Throwable e) { rc = ReturnCode.THROWABLE_EXCEPTION; PigStatsUtil.setErrorMessage(e.getMessage()); @@ -669,7 +668,6 @@ public class Main { if(!gruntCalled) { LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched"); } - killRunningJobsIfInterrupted(e, pigContext); } finally { if (printScriptRunTime) { printScriptRunTime(startTime); @@ -696,22 +694,6 @@ public class Main { + " (" + duration.getMillis() + " ms)"); } - private static void killRunningJobsIfInterrupted(Throwable e, PigContext pigContext) { - Throwable cause = e.getCause(); - // Kill running job when we get InterruptedException - // Pig thread is interrupted by mapreduce when Oozie launcher job is killed - // Shutdown hook kills running jobs, but sometimes NodeManager can issue - // a SIGKILL after AM unregisters and before shutdown hook gets to execute - // causing orphaned jobs that continue to run. - if (e instanceof InterruptedException || (cause != null && cause instanceof InterruptedException)) { - try { - pigContext.getExecutionEngine().kill(); - } catch (BackendException be) { - log.error("Error while killing running jobs", be); - } - } - } - protected static PigProgressNotificationListener makeListener(Properties properties) { try { @@ -989,10 +971,11 @@ public class Main { System.out.println("Additionally, any Hadoop property can be specified."); } - private static String validateLogFile(String logFileName, File scriptFile) { + private static String validateLogFile(String logFileName, String scriptName) { String strippedDownScriptName = null; - if (scriptFile != null) { + if(scriptName != null) { + File scriptFile = new File(scriptName); if(!scriptFile.isDirectory()) { String scriptFileAbsPath; try { Modified: pig/branches/spark/src/org/apache/pig/PigConfiguration.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigConfiguration.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/PigConfiguration.java (original) +++ pig/branches/spark/src/org/apache/pig/PigConfiguration.java Fri Feb 24 03:34:37 2017 @@ -18,7 +18,6 @@ package org.apache.pig; - /** * Container for static configuration strings, defaults, etc. This is intended just for keys that can * be set by users, not for keys that are generally used within pig. @@ -63,15 +62,9 @@ public class PigConfiguration { public static final String PIG_TEZ_OPT_UNION = "pig.tez.opt.union"; /** * These keys are used to enable or disable tez union optimization for - * specific StoreFuncs. Optimization should be turned off for those - * StoreFuncs that hard code part file names and do not prefix file names - * with mapreduce.output.basename configuration. - * - * If the StoreFuncs implement - * {@link StoreFunc#supportsParallelWriteToStoreLocation()} and return true - * or false then that is is used to turn on or off union optimization - * respectively. These settings can be used for StoreFuncs that have not - * implemented the API yet. + * specific StoreFuncs so that optimization is only applied to StoreFuncs + * that do not hard part file names and honor mapreduce.output.basename and + * is turned of for those that do not. Refer PIG-4649 */ public static final String PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS = "pig.tez.opt.union.supported.storefuncs"; public static final String PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS = "pig.tez.opt.union.unsupported.storefuncs"; @@ -134,58 +127,6 @@ public class PigConfiguration { public static final String PIG_SKEWEDJOIN_REDUCE_MEM = "pig.skewedjoin.reduce.mem"; /** - * Bloom join has two different kind of implementations. - * <ul> - * <li>map <br> - * In each map, bloom filters are computed on the join keys partitioned by - * the hashcode of the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of - * partitions. Bloom filters from different maps are then combined in the - * reducer producing one bloom filter per partition. This is efficient and - * fast if there are smaller number of maps (<10) and the number of - * distinct keys are not too high. It can be faster with larger number of - * maps and even with bigger bloom vector sizes, but the amount of data - * shuffled to the reducer for aggregation becomes huge making it - * inefficient.</li> - * <li>reduce <br> - * Join keys are sent from the map to the reducer partitioned by hashcode of - * the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of reducers. One - * bloom filter is then created per partition. This is efficient for larger - * datasets with lot of maps or very large - * {@link #PIG_BLOOMJOIN_VECTORSIZE_BYTES}. In this case size of keys sent - * to the reducer is smaller than sending bloom filters to reducer for - * aggregation making it efficient.</li> - * </ul> - * Default value is map. - */ - public static final String PIG_BLOOMJOIN_STRATEGY = "pig.bloomjoin.strategy"; - - /** - * The number of bloom filters that will be created. - * Default is 1 for map strategy and 11 for reduce strategy. - */ - public static final String PIG_BLOOMJOIN_NUM_FILTERS = "pig.bloomjoin.num.filters"; - - /** - * The size in bytes of the bit vector to be used for the bloom filter. - * A bigger vector size will be needed when the number of distinct keys is higher. - * Default value is 1048576 (1MB). - */ - public static final String PIG_BLOOMJOIN_VECTORSIZE_BYTES = "pig.bloomjoin.vectorsize.bytes"; - - /** - * The type of hash function to use. Valid values are jenkins and murmur. - * Default is murmur. - */ - public static final String PIG_BLOOMJOIN_HASH_TYPE = "pig.bloomjoin.hash.type"; - - /** - * The number of hash functions to be used in bloom computation. It determines the probability of false positives. - * Higher the number lower the false positives. Too high a value can increase the cpu time. - * Default value is 3. - */ - public static final String PIG_BLOOMJOIN_HASH_FUNCTIONS = "pig.bloomjoin.hash.functions"; - - /** * This key used to control the maximum size loaded into * the distributed cache when doing fragment-replicated join */ @@ -210,12 +151,6 @@ public class PigConfiguration { * This key is used to configure grace parallelism in tez. Default is true. */ public static final String PIG_TEZ_GRACE_PARALLELISM = "pig.tez.grace.parallelism"; - /** - * This key is used to turn off dag recovery if there is auto parallelism. - * Default is false. Useful when running with Tez versions before Tez 0.8 - * which have issues with auto parallelism during DAG recovery. - */ - public static final String PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY = "pig.tez.auto.parallelism.disable.dag.recovery"; /** * This key is used to configure compression for the pig input splits which @@ -389,17 +324,17 @@ public class PigConfiguration { /** * Boolean value used to enable or disable error handling for storers */ - public static final String PIG_ERROR_HANDLING_ENABLED = "pig.error-handling.enabled"; + public static final String PIG_ALLOW_STORE_ERRORS = "pig.allow.store.errors"; /** * Controls the minimum number of errors */ - public static final String PIG_ERROR_HANDLING_MIN_ERROR_RECORDS = "pig.error-handling.min.error.records"; + public static final String PIG_ERRORS_MIN_RECORDS = "pig.errors.min.records"; /** * Set the threshold for percentage of errors */ - public static final String PIG_ERROR_HANDLING_THRESHOLD_PERCENT = "pig.error-handling.error.threshold"; + public static final String PIG_ERROR_THRESHOLD_PERCENT = "pig.error.threshold.percent"; /** * Comma-delimited entries of commands/operators that must be disallowed. @@ -476,31 +411,6 @@ public class PigConfiguration { */ public static final String PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE = "pig.spill.unused.memory.threshold.size"; - /** - * Log tracing id that can be used by upstream clients for tracking respective logs - */ - public static final String PIG_LOG_TRACE_ID = "pig.log.trace.id"; - - /** - * @deprecated use {@link #PIG_LOG_TRACE_ID} instead. Will be removed in Pig 0.18 - */ - public static final String CALLER_ID = PIG_LOG_TRACE_ID; - - /** - * Enable ATS for Pig - */ - public static final String PIG_ATS_ENABLED = "pig.ats.enabled"; - - /** - * @deprecated use {@link #PIG_ATS_ENABLED} instead. Will be removed in Pig 0.18 - */ - public static final String ENABLE_ATS = PIG_ATS_ENABLED; - - /** - * If set, Pig will override tez.am.launch.cmd-opts and tez.am.resource.memory.mb to optimal - */ - public static final String PIG_TEZ_CONFIGURE_AM_MEMORY = "pig.tez.configure.am.memory"; - // Deprecated settings of Pig 0.13 /** Modified: pig/branches/spark/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigServer.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/PigServer.java (original) +++ pig/branches/spark/src/org/apache/pig/PigServer.java Fri Feb 24 03:34:37 2017 @@ -25,8 +25,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; import java.io.StringReader; -import java.lang.reflect.Constructor; -import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.Collection; @@ -55,7 +53,6 @@ import org.apache.pig.backend.datastorag import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS; -import org.apache.pig.backend.hadoop.PigATSClient; import org.apache.pig.backend.hadoop.executionengine.HJob; import org.apache.pig.builtin.PigStorage; import org.apache.pig.classification.InterfaceAudience; @@ -244,54 +241,6 @@ public class PigServer { } PigStats.start(pigContext.getExecutionEngine().instantiatePigStats()); - // log ATS event includes the caller context - String auditId = PigATSClient.getPigAuditId(pigContext); - String callerId = (String)pigContext.getProperties().get(PigConfiguration.PIG_LOG_TRACE_ID); - log.info("Pig Script ID for the session: " + auditId); - if (callerId != null) { - log.info("Caller ID for session: " + callerId); - } - if (Boolean.parseBoolean(pigContext.getProperties() - .getProperty(PigConfiguration.PIG_ATS_ENABLED))) { - if (Boolean.parseBoolean(pigContext.getProperties() - .getProperty("yarn.timeline-service.enabled", "false"))) { - PigATSClient.ATSEvent event = new PigATSClient.ATSEvent(auditId, callerId); - try { - PigATSClient.getInstance().logEvent(event); - } catch (Exception e) { - log.warn("Error posting to ATS: ", e); - } - } else { - log.warn("ATS is disabled since" - + " yarn.timeline-service.enabled set to false"); - } - - } - - // set hdfs caller context - Class callerContextClass = null; - try { - callerContextClass = Class.forName("org.apache.hadoop.ipc.CallerContext"); - } catch (ClassNotFoundException e) { - // If pre-Hadoop 2.8.0, skip setting CallerContext - } - if (callerContextClass != null) { - try { - // Reflection for the following code since it is only available since hadoop 2.8.0: - // CallerContext hdfsContext = new CallerContext.Builder(auditId).build(); - // CallerContext.setCurrent(hdfsContext); - Class callerContextBuilderClass = Class.forName("org.apache.hadoop.ipc.CallerContext$Builder"); - Constructor callerContextBuilderConstruct = callerContextBuilderClass.getConstructor(String.class); - Object builder = callerContextBuilderConstruct.newInstance(auditId); - Method builderBuildMethod = builder.getClass().getMethod("build"); - Object hdfsContext = builderBuildMethod.invoke(builder); - Method callerContextSetCurrentMethod = callerContextClass.getMethod("setCurrent", hdfsContext.getClass()); - callerContextSetCurrentMethod.invoke(callerContextClass, hdfsContext); - } catch (Exception e) { - // Shall not happen unless API change in future Hadoop commons - throw new ExecException(e); - } - } } private void addHadoopProperties() throws ExecException { @@ -663,8 +612,7 @@ public class PigServer { pigContext.scriptingUDFs.put(path, namespace); } - FetchFileRet ret = FileLocalizer.fetchFile(pigContext.getProperties(), path); - File f = ret.file; + File f = FileLocalizer.fetchFile(pigContext.getProperties(), path).file; if (!f.canRead()) { int errCode = 4002; String msg = "Can't read file: " + path; @@ -673,19 +621,9 @@ public class PigServer { } String cwd = new File(".").getCanonicalPath(); String filePath = f.getCanonicalPath(); - String nameInJar = filePath; - // Use the relative path in the jar, if the path specified is relative - if (!ret.didFetch) { - if (!new File(path).isAbsolute() && path.indexOf("." + File.separator) == -1) { - // In case of Oozie, the localized files are in a different - // directory symlinked to the current directory. Canonical path will not point to cwd. - nameInJar = path; - } else if (filePath.equals(cwd + File.separator + path)) { - // If user specified absolute path and it refers to cwd - nameInJar = filePath.substring(cwd.length() + 1); - } - } - + //Use the relative path in the jar, if the path specified is relative + String nameInJar = filePath.equals(cwd + File.separator + path) ? + filePath.substring(cwd.length() + 1) : filePath; pigContext.addScriptFile(nameInJar, filePath); if(scriptingLang != null) { ScriptEngine se = ScriptEngine.getInstance(scriptingLang); Modified: pig/branches/spark/src/org/apache/pig/StoreFunc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/StoreFunc.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/StoreFunc.java (original) +++ pig/branches/spark/src/org/apache/pig/StoreFunc.java Fri Feb 24 03:34:37 2017 @@ -21,14 +21,17 @@ import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; + import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.tools.pigstats.PigStatusReporter; /** @@ -42,21 +45,21 @@ public abstract class StoreFunc implemen /** * This method is called by the Pig runtime in the front end to convert the * output location to an absolute path if the location is relative. The - * StoreFunc implementation is free to choose how it converts a relative + * StoreFunc implementation is free to choose how it converts a relative * location to an absolute location since this may depend on what the location - * string represent (hdfs path or some other data source). - * - * + * string represent (hdfs path or some other data source). + * + * * @param location location as provided in the "store" statement of the script * @param curDir the current working direction based on any "cd" statements * in the script before the "store" statement. If there are no "cd" statements - * in the script, this would be the home directory - + * in the script, this would be the home directory - * <pre>/user/<username> </pre> * @return the absolute location based on the arguments passed * @throws IOException if the conversion is not possible */ @Override - public String relToAbsPathForStoreLocation(String location, Path curDir) + public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { return LoadFunc.getAbsolutePath(location, curDir); } @@ -64,34 +67,32 @@ public abstract class StoreFunc implemen /** * Return the OutputFormat associated with StoreFunc. This will be called * on the front end during planning and on the backend during - * execution. + * execution. * @return the {@link OutputFormat} associated with StoreFunc - * @throws IOException if an exception occurs while constructing the + * @throws IOException if an exception occurs while constructing the * OutputFormat * */ - @Override public abstract OutputFormat getOutputFormat() throws IOException; /** - * Communicate to the storer the location where the data needs to be stored. - * The location string passed to the {@link StoreFunc} here is the - * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} + * Communicate to the storer the location where the data needs to be stored. + * The location string passed to the {@link StoreFunc} here is the + * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * This method will be called in the frontend and backend multiple times. Implementations * should bear in mind that this method is called multiple times and should * ensure there are no inconsistent side effects due to the multiple calls. * {@link #checkSchema(ResourceSchema)} will be called before any call to * {@link #setStoreLocation(String, Job)}. - * + * - * @param location Location returned by + * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * @param job The {@link Job} object * @throws IOException if the location is not valid. */ - @Override public abstract void setStoreLocation(String location, Job job) throws IOException; - + /** * Set the schema for data to be stored. This will be called on the * front end during planning if the store is associated with a schema. @@ -116,23 +117,21 @@ public abstract class StoreFunc implemen * @param writer RecordWriter to use. * @throws IOException if an exception occurs during initialization */ - @Override public abstract void prepareToWrite(RecordWriter writer) throws IOException; /** * Write a tuple to the data store. - * + * * @param t the tuple to store. * @throws IOException if an exception occurs during the write */ - @Override public abstract void putNext(Tuple t) throws IOException; - + /** * This method will be called by Pig both in the front end and back end to * pass a unique signature to the {@link StoreFunc} which it can use to store * information in the {@link UDFContext} which it needs to store between - * various method invocations in the front end and back end. This method + * various method invocations in the front end and back end. This method * will be called before other methods in {@link StoreFunc}. This is necessary * because in a Pig Latin script with multiple stores, the different * instances of store functions need to be able to find their (and only their) @@ -143,21 +142,21 @@ public abstract class StoreFunc implemen public void setStoreFuncUDFContextSignature(String signature) { // default implementation is a no-op } - + /** * This method will be called by Pig if the job which contains this store * fails. Implementations can clean up output locations in this method to * ensure that no incorrect/incomplete results are left in the output location. * The default implementation deletes the output location if it * is a {@link FileSystem} location. - * @param location Location returned by + * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} - * @param job The {@link Job} object - this should be used only to obtain + * @param job The {@link Job} object - this should be used only to obtain * cluster properties through {@link Job#getConfiguration()} and not to set/query - * any runtime job information. + * any runtime job information. */ @Override - public void cleanupOnFailure(String location, Job job) + public void cleanupOnFailure(String location, Job job) throws IOException { cleanupOnFailureImpl(location, job); } @@ -167,19 +166,19 @@ public abstract class StoreFunc implemen * is successful, and some cleanup of intermediate resources is required. * Implementations can clean up output locations in this method to * ensure that no incorrect/incomplete results are left in the output location. - * @param location Location returned by + * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} - * @param job The {@link Job} object - this should be used only to obtain + * @param job The {@link Job} object - this should be used only to obtain * cluster properties through {@link Job#getConfiguration()} and not to set/query - * any runtime job information. + * any runtime job information. */ @Override - public void cleanupOnSuccess(String location, Job job) + public void cleanupOnSuccess(String location, Job job) throws IOException { // DEFAULT: DO NOTHING, user-defined overrides can // call cleanupOnFailureImpl(location, job) or ...? } - + /** * Default implementation for {@link #cleanupOnFailure(String, Job)} * and {@link #cleanupOnSuccess(String, Job)}. This removes a file @@ -188,56 +187,15 @@ public abstract class StoreFunc implemen * @param job Hadoop job, used to access the appropriate file system. * @throws IOException */ - public static void cleanupOnFailureImpl(String location, Job job) - throws IOException { + public static void cleanupOnFailureImpl(String location, Job job) + throws IOException { Path path = new Path(location); FileSystem fs = path.getFileSystem(job.getConfiguration()); if(fs.exists(path)){ fs.delete(path, true); - } - } - - // TODO When dropping support for JDK 7 move this as a default method to StoreFuncInterface - /** - * DAG execution engines like Tez support optimizing union by writing to - * output location in parallel from tasks of different vertices. Commit is - * called once all the vertices in the union are complete. This eliminates - * need to have a separate phase to read data output from previous phases, - * union them and write out again. - * - * Enabling the union optimization requires the OutputFormat to - * - * 1) Support creation of different part file names for tasks of different - * vertices. Conflicting filenames can create data corruption and loss. - * For eg: If task 0 of vertex1 and vertex2 both create filename as - * part-r-00000, then one of the files will be overwritten when promoting - * from temporary to final location leading to data loss. - * FileOutputFormat has mapreduce.output.basename config which enables - * naming files differently in different vertices. Classes extending - * FileOutputFormat and those prefixing file names with mapreduce.output.basename - * value will not encounter conflict. Cases like HBaseStorage which write to key - * value store and do not produce files also should not face any conflict. - * - * 2) Support calling of commit once at the end takes care of promoting - * temporary files of the different vertices into the final location. - * For eg: FileOutputFormat commit algorithm handles promoting of files produced - * by tasks of different vertices into final output location without issues - * if there is no file name conflict. In cases like HBaseStorage, the - * TableOutputCommitter does nothing on commit. - * - * If custom OutputFormat used by the StoreFunc does not support the above - * two criteria, then false should be returned. Union optimization will be - * disabled for the StoreFunc. - * - * Default implementation returns null and in that case planner falls back - * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and - * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS} - * settings to determine if the StoreFunc supports it. - */ - public Boolean supportsParallelWriteToStoreLocation() { - return null; + } } - + /** * Issue a warning. Warning messages are aggregated and reported to * the user. Modified: pig/branches/spark/src/org/apache/pig/StreamToPig.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/StreamToPig.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/StreamToPig.java (original) +++ pig/branches/spark/src/org/apache/pig/StreamToPig.java Fri Feb 24 03:34:37 2017 @@ -57,7 +57,7 @@ public interface StreamToPig { public Tuple deserialize(byte[] bytes) throws IOException; /** - * This will be called on both the front end and the back + * This will be called on the front end during planning and not on the back * end during execution. * * @return the {@link LoadCaster} associated with this object, or null if Modified: pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Fri Feb 24 03:34:37 2017 @@ -183,14 +183,6 @@ public interface ExecutionEngine { public ExecutableManager getExecutableManager(); /** - * This method is called when user requests to kill all jobs - * associated with the execution engine - * - * @throws BackendException - */ - public void kill() throws BackendException; - - /** * This method is called when a user requests to kill a job associated with * the given job id. If it is not possible for a user to kill a job, throw a * exception. It is imperative for the job id's being displayed to be unique Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java Fri Feb 24 03:34:37 2017 @@ -17,6 +17,8 @@ package org.apache.pig.backend.hadoop.accumulo; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.math.BigDecimal; import java.math.BigInteger; import java.util.Collection; @@ -301,8 +303,24 @@ public abstract class AbstractAccumuloSt */ protected void simpleUnset(Configuration conf, Map<String, String> entriesToUnset) { - for (String key : entriesToUnset.keySet()) { - conf.unset(key); + try { + Method unset = conf.getClass().getMethod("unset", String.class); + + for (String key : entriesToUnset.keySet()) { + unset.invoke(conf, key); + } + } catch (NoSuchMethodException e) { + log.error("Could not invoke Configuration.unset method", e); + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + log.error("Could not invoke Configuration.unset method", e); + throw new RuntimeException(e); + } catch (IllegalArgumentException e) { + log.error("Could not invoke Configuration.unset method", e); + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + log.error("Could not invoke Configuration.unset method", e); + throw new RuntimeException(e); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java Fri Feb 24 03:34:37 2017 @@ -22,6 +22,8 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URL; import java.net.URLDecoder; import java.text.MessageFormat; @@ -40,7 +42,6 @@ import java.util.zip.ZipOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Logger; @@ -111,7 +112,7 @@ public class Utils { // attempt to locate an existing jar for the class. String jar = findContainingJar(my_class, packagedClasses); if (null == jar || jar.isEmpty()) { - jar = JarFinder.getJar(my_class); + jar = getJar(my_class); updateMap(jar, packagedClasses); } @@ -199,6 +200,41 @@ public class Utils { } /** + * Invoke 'getJar' on a JarFinder implementation. Useful for some job + * configuration contexts (HBASE-8140) and also for testing on MRv2. First + * check if we have HADOOP-9426. Lacking that, fall back to the backport. + * + * @param my_class + * the class to find. + * @return a jar file that contains the class, or null. + */ + private static String getJar(Class<?> my_class) { + String ret = null; + String hadoopJarFinder = "org.apache.hadoop.util.JarFinder"; + Class<?> jarFinder = null; + try { + log.debug("Looking for " + hadoopJarFinder + "."); + jarFinder = Class.forName(hadoopJarFinder); + log.debug(hadoopJarFinder + " found."); + Method getJar = jarFinder.getMethod("getJar", Class.class); + ret = (String) getJar.invoke(null, my_class); + } catch (ClassNotFoundException e) { + log.debug("Using backported JarFinder."); + ret = jarFinderGetJar(my_class); + } catch (InvocationTargetException e) { + // function was properly called, but threw it's own exception. + // Unwrap it + // and pass it on. + throw new RuntimeException(e.getCause()); + } catch (Exception e) { + // toss all other exceptions, related to reflection failure + throw new RuntimeException("getJar invocation failed.", e); + } + + return ret; + } + + /** * Returns the full path to the Jar containing the class. It always return a * JAR. * Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java Fri Feb 24 03:34:37 2017 @@ -29,6 +29,7 @@ import org.apache.pig.ExecType; import org.apache.pig.PigConstants; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; public class ConfigurationUtil { @@ -88,7 +89,7 @@ public class ConfigurationUtil { // so build/classes/hadoop-site.xml contains such entry. This prevents some tests from // successful (They expect those files in hdfs), so we need to unset it in hadoop 23. // This should go away once MiniMRCluster fix the distributed cache issue. - localConf.unset(MRConfiguration.JOB_CACHE_FILES); + HadoopShims.unsetConf(localConf, MRConfiguration.JOB_CACHE_FILES); } localConf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); Properties props = ConfigurationUtil.toProperties(localConf); @@ -105,14 +106,4 @@ public class ConfigurationUtil { } } } - - /** - * Returns Properties containing alternative names of given property and same values - can be used to solve deprecations - * @return - */ - public static Properties expandForAlternativeNames(String name, String value){ - final Configuration config = new Configuration(false); - config.set(name,value); - return ConfigurationUtil.toProperties(config); - } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java Fri Feb 24 03:34:37 2017 @@ -18,20 +18,20 @@ package org.apache.pig.backend.hadoop.datastorage; -import java.io.IOException; import java.net.URI; +import java.io.IOException; import java.util.ArrayList; -import java.util.Enumeration; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Properties; +import java.util.Enumeration; +import java.util.Map; +import java.util.HashMap; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.conf.Configuration; import org.apache.pig.PigException; import org.apache.pig.backend.datastorage.ContainerDescriptor; import org.apache.pig.backend.datastorage.DataStorage; @@ -40,6 +40,8 @@ import org.apache.pig.backend.datastorag public class HDataStorage implements DataStorage { + private static final String FILE_SYSTEM_LOCATION = "fs.default.name"; + private FileSystem fs; private Configuration configuration; private Properties properties; @@ -56,10 +58,9 @@ public class HDataStorage implements Dat init(); } - @Override public void init() { // check if name node is set, if not we set local as fail back - String nameNode = this.properties.getProperty(FileSystem.FS_DEFAULT_NAME_KEY); + String nameNode = this.properties.getProperty(FILE_SYSTEM_LOCATION); if (nameNode == null || nameNode.length() == 0) { nameNode = "local"; } @@ -75,17 +76,14 @@ public class HDataStorage implements Dat } } - @Override public void close() throws IOException { fs.close(); } - - @Override + public Properties getConfiguration() { return this.properties; } - @Override public void updateConfiguration(Properties newConfiguration) throws DataStorageException { // TODO sgroschupf 25Feb2008 this method is never called and @@ -94,40 +92,38 @@ public class HDataStorage implements Dat if (newConfiguration == null) { return; } - + Enumeration<Object> newKeys = newConfiguration.keys(); - + while (newKeys.hasMoreElements()) { String key = (String) newKeys.nextElement(); String value = null; - + value = newConfiguration.getProperty(key); - + fs.getConf().set(key,value); } } - - @Override + public Map<String, Object> getStatistics() throws IOException { Map<String, Object> stats = new HashMap<String, Object>(); long usedBytes = fs.getUsed(); stats.put(USED_BYTES_KEY , Long.valueOf(usedBytes).toString()); - + if (fs instanceof DistributedFileSystem) { DistributedFileSystem dfs = (DistributedFileSystem) fs; - + long rawCapacityBytes = dfs.getRawCapacity(); stats.put(RAW_CAPACITY_KEY, Long.valueOf(rawCapacityBytes).toString()); - + long rawUsedBytes = dfs.getRawUsed(); stats.put(RAW_USED_KEY, Long.valueOf(rawUsedBytes).toString()); } - + return stats; } - - @Override + public ElementDescriptor asElement(String name) throws DataStorageException { if (this.isContainer(name)) { return new HDirectory(this, name); @@ -136,82 +132,70 @@ public class HDataStorage implements Dat return new HFile(this, name); } } - - @Override + public ElementDescriptor asElement(ElementDescriptor element) throws DataStorageException { return asElement(element.toString()); } - - @Override + public ElementDescriptor asElement(String parent, - String child) + String child) throws DataStorageException { return asElement((new Path(parent, child)).toString()); } - @Override public ElementDescriptor asElement(ContainerDescriptor parent, - String child) + String child) throws DataStorageException { return asElement(parent.toString(), child); } - @Override public ElementDescriptor asElement(ContainerDescriptor parent, - ElementDescriptor child) + ElementDescriptor child) throws DataStorageException { return asElement(parent.toString(), child.toString()); } - @Override - public ContainerDescriptor asContainer(String name) + public ContainerDescriptor asContainer(String name) throws DataStorageException { return new HDirectory(this, name); } - - @Override + public ContainerDescriptor asContainer(ContainerDescriptor container) throws DataStorageException { return new HDirectory(this, container.toString()); } - - @Override + public ContainerDescriptor asContainer(String parent, - String child) + String child) throws DataStorageException { return new HDirectory(this, parent, child); } - @Override public ContainerDescriptor asContainer(ContainerDescriptor parent, - String child) + String child) throws DataStorageException { return new HDirectory(this, parent.toString(), child); } - - @Override + public ContainerDescriptor asContainer(ContainerDescriptor parent, ContainerDescriptor child) throws DataStorageException { return new HDirectory(this, parent.toString(), child.toString()); } - - @Override + public void setActiveContainer(ContainerDescriptor container) { fs.setWorkingDirectory(new Path(container.toString())); } - - @Override + public ContainerDescriptor getActiveContainer() { return new HDirectory(this, fs.getWorkingDirectory()); } - @Override public boolean isContainer(String name) throws DataStorageException { boolean isContainer = false; Path path = new Path(name); - + try { if ((this.fs.exists(path)) && (! this.fs.isFile(path))) { isContainer = true; @@ -222,11 +206,10 @@ public class HDataStorage implements Dat String msg = "Unable to check name " + name; throw new DataStorageException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e); } - + return isContainer; } - - @Override + public HPath[] asCollection(String pattern) throws DataStorageException { try { FileStatus[] paths = this.fs.globStatus(new Path(pattern)); @@ -235,7 +218,7 @@ public class HDataStorage implements Dat return new HPath[0]; List<HPath> hpaths = new ArrayList<HPath>(); - + for (int i = 0; i < paths.length; ++i) { HPath hpath = (HPath)this.asElement(paths[i].getPath().toString()); if (!hpath.systemElement()) { @@ -250,7 +233,7 @@ public class HDataStorage implements Dat throw new DataStorageException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e); } } - + public FileSystem getHFS() { return fs; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Feb 24 03:34:37 2017 @@ -30,7 +30,6 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.mapred.JobConf; import org.apache.pig.PigException; @@ -77,6 +76,8 @@ public abstract class HExecutionEngine i public static final String MAPRED_DEFAULT_SITE = "mapred-default.xml"; public static final String YARN_DEFAULT_SITE = "yarn-default.xml"; + public static final String FILE_SYSTEM_LOCATION = "fs.default.name"; + public static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = "fs.defaultFS"; public static final String LOCAL = "local"; protected PigContext pigContext; @@ -202,8 +203,8 @@ public abstract class HExecutionEngine i properties.setProperty(MRConfiguration.FRAMEWORK_NAME, LOCAL); } properties.setProperty(MRConfiguration.JOB_TRACKER, LOCAL); - properties.remove("fs.default.name"); //Deprecated in Hadoop 2.x - properties.setProperty(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); + properties.setProperty(FILE_SYSTEM_LOCATION, "file:///"); + properties.setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, "file:///"); jc = getLocalConf(); JobConf s3Jc = getS3Conf(); @@ -219,7 +220,24 @@ public abstract class HExecutionEngine i HKerberos.tryKerberosKeytabLogin(jc); cluster = jc.get(MRConfiguration.JOB_TRACKER); - nameNode = jc.get(FileSystem.FS_DEFAULT_NAME_KEY); + nameNode = jc.get(FILE_SYSTEM_LOCATION); + if (nameNode == null) { + nameNode = (String) pigContext.getProperties().get(ALTERNATIVE_FILE_SYSTEM_LOCATION); + } + + if (cluster != null && cluster.length() > 0) { + if (!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) { + cluster = cluster + ":50020"; + } + properties.setProperty(MRConfiguration.JOB_TRACKER, cluster); + } + + if (nameNode != null && nameNode.length() > 0) { + if (!nameNode.contains(":") && !nameNode.equalsIgnoreCase(LOCAL)) { + nameNode = nameNode + ":8020"; + } + properties.setProperty(FILE_SYSTEM_LOCATION, nameNode); + } LOG.info("Connecting to hadoop file system at: " + (nameNode == null ? LOCAL : nameNode)); @@ -351,11 +369,7 @@ public abstract class HExecutionEngine i @Override public void setProperty(String property, String value) { Properties properties = pigContext.getProperties(); - if (Configuration.isDeprecated(property)) { - properties.putAll(ConfigurationUtil.expandForAlternativeNames(property, value)); - } else { - properties.put(property, value); - } + properties.put(property, value); } @Override @@ -364,13 +378,6 @@ public abstract class HExecutionEngine i } @Override - public void kill() throws BackendException { - if (launcher != null) { - launcher.kill(); - } - } - - @Override public void killJob(String jobID) throws BackendException { if (launcher != null) { launcher.killJob(jobID, getJobConf()); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Fri Feb 24 03:34:37 2017 @@ -40,7 +40,7 @@ import org.apache.pig.tools.pigstats.Pig public class HJob implements ExecJob { private final Log log = LogFactory.getLog(getClass()); - + protected JOB_STATUS status; protected PigContext pigContext; protected FileSpec outFileSpec; @@ -48,7 +48,7 @@ public class HJob implements ExecJob { protected String alias; protected POStore poStore; private PigStats stats; - + public HJob(JOB_STATUS status, PigContext pigContext, POStore store, @@ -59,7 +59,7 @@ public class HJob implements ExecJob { this.outFileSpec = poStore.getSFile(); this.alias = alias; } - + public HJob(JOB_STATUS status, PigContext pigContext, POStore store, @@ -72,41 +72,37 @@ public class HJob implements ExecJob { this.alias = alias; this.stats = stats; } - - @Override + public JOB_STATUS getStatus() { return status; } - - @Override + public boolean hasCompleted() throws ExecException { return true; } - - @Override + public Iterator<Tuple> getResults() throws ExecException { final LoadFunc p; - + try{ - LoadFunc originalLoadFunc = + LoadFunc originalLoadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec( outFileSpec.getFuncSpec()); - - p = (LoadFunc) new ReadToEndLoader(originalLoadFunc, + + p = (LoadFunc) new ReadToEndLoader(originalLoadFunc, ConfigurationUtil.toConfiguration( - pigContext.getProperties()), outFileSpec.getFileName(), 0); + pigContext.getProperties()), outFileSpec.getFileName(), 0, pigContext); }catch (Exception e){ int errCode = 2088; String msg = "Unable to get results for: " + outFileSpec; throw new ExecException(msg, errCode, PigException.BUG, e); } - + return new Iterator<Tuple>() { Tuple t; boolean atEnd; - @Override public boolean hasNext() { if (atEnd) return false; @@ -124,7 +120,6 @@ public class HJob implements ExecJob { return !atEnd; } - @Override public Tuple next() { Tuple next = t; if (next != null) { @@ -141,7 +136,6 @@ public class HJob implements ExecJob { return next; } - @Override public void remove() { throw new RuntimeException("Removal not supported"); } @@ -149,38 +143,31 @@ public class HJob implements ExecJob { }; } - @Override public Properties getConfiguration() { return pigContext.getProperties(); } - @Override public PigStats getStatistics() { //throw new UnsupportedOperationException(); return stats; } - @Override public void completionNotification(Object cookie) { throw new UnsupportedOperationException(); } - - @Override + public void kill() throws ExecException { throw new UnsupportedOperationException(); } - - @Override + public void getLogs(OutputStream log) throws ExecException { throw new UnsupportedOperationException(); } - - @Override + public void getSTDOut(OutputStream out) throws ExecException { throw new UnsupportedOperationException(); } - - @Override + public void getSTDError(OutputStream error) throws ExecException { throw new UnsupportedOperationException(); } @@ -189,7 +176,6 @@ public class HJob implements ExecJob { backendException = e; } - @Override public Exception getException() { return backendException; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Fri Feb 24 03:34:37 2017 @@ -32,8 +32,7 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.TIPStatus; -import org.apache.hadoop.mapreduce.TaskReport; +import org.apache.hadoop.mapred.TaskReport; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.pig.FuncSpec; @@ -41,6 +40,7 @@ import org.apache.pig.PigException; import org.apache.pig.backend.BackendException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.PlanException; @@ -76,7 +76,7 @@ public abstract class Launcher { protected Map<FileSpec, Exception> failureMap; protected JobControl jc = null; - protected class HangingJobKiller extends Thread { + class HangingJobKiller extends Thread { public HangingJobKiller() {} @Override @@ -90,6 +90,7 @@ public abstract class Launcher { } protected Launcher() { + Runtime.getRuntime().addShutdownHook(new HangingJobKiller()); // handle the windows portion of \r if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) { newLine = "\r\n"; @@ -103,6 +104,7 @@ public abstract class Launcher { public void reset() { failureMap = Maps.newHashMap(); totalHadoopTimeSpent = 0; + jc = null; } /** @@ -177,7 +179,7 @@ public abstract class Launcher { String exceptionCreateFailMsg = null; boolean jobFailed = false; if (msgs.length > 0) { - if (report.getCurrentStatus()== TIPStatus.FAILED) { + if (HadoopShims.isJobFailed(report)) { jobFailed = true; } Set<String> errorMessageSet = new HashSet<String>(); @@ -259,30 +261,11 @@ public abstract class Launcher { List<Job> runnJobs = jc.getRunningJobs(); for (Job j : runnJobs) { - prog += progressOfRunningJob(j); + prog += HadoopShims.progressOfRunningJob(j); } return prog; } - /** - * Returns the progress of a Job j which is part of a submitted JobControl - * object. The progress is for this Job. So it has to be scaled down by the - * num of jobs that are present in the JobControl. - * - * @param j The Job for which progress is required - * @return Returns the percentage progress of this Job - * @throws IOException - */ - private static double progressOfRunningJob(Job j) - throws IOException { - org.apache.hadoop.mapreduce.Job mrJob = j.getJob(); - try { - return (mrJob.mapProgress() + mrJob.reduceProgress()) / 2; - } catch (Exception ir) { - return 0; - } - } - public long getTotalHadoopTimeSpent() { return totalHadoopTimeSpent; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Fri Feb 24 03:34:37 2017 @@ -25,7 +25,6 @@ import org.apache.hadoop.mapreduce.TaskA import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor; @@ -123,8 +122,7 @@ public class FetchLauncher { poStore.setUp(); TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID(); - //Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop - conf.setInt(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, taskAttemptID.getId()); + HadoopShims.setTaskAttemptId(conf, taskAttemptID); if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) { MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java Fri Feb 24 03:34:37 2017 @@ -95,7 +95,7 @@ public class FetchPOStoreImpl extends PO } if (outputCommitter.needsTaskCommit(context)) outputCommitter.commitTask(context); - outputCommitter.commitJob(context); + HadoopShims.commitOrCleanup(outputCommitter, context); } @Override @@ -109,7 +109,7 @@ public class FetchPOStoreImpl extends PO } writer = null; } - outputCommitter.commitJob(context); + HadoopShims.commitOrCleanup(outputCommitter, context); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java Fri Feb 24 03:34:37 2017 @@ -22,48 +22,43 @@ import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Reducer; -import org.apache.pig.impl.io.NullableTuple; + import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.pig.impl.io.NullableTuple; /** * A special implementation of combiner used only for distinct. This combiner * does not even parse out the records. It just throws away duplicate values - * in the key in order to minimize the data being sent to the reduce. + * in the key in order ot minimize the data being sent to the reduce. */ public class DistinctCombiner { - public static class Combine + public static class Combine extends Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> { - + private final Log log = LogFactory.getLog(getClass()); - private static boolean firstTime = true; - - //@StaticDataCleanup - public static void staticDataCleanup() { - firstTime = true; - } - + ProgressableReporter pigReporter; + + /** + * Configures the reporter + */ @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); - Configuration jConf = context.getConfiguration(); - // Avoid log spamming - if (firstTime) { - log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location")); - firstTime = false; - } + pigReporter = new ProgressableReporter(); } - + /** * The reduce function which removes values. */ @Override - protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) + protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) throws IOException, InterruptedException { + + pigReporter.setRep(context); // Take the first value and the key and collect // just that. @@ -71,7 +66,6 @@ public class DistinctCombiner { NullableTuple val = iter.next(); context.write(key, val); } - } - + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Fri Feb 24 03:34:37 2017 @@ -75,24 +75,16 @@ public class FileBasedOutputSizeReader i return -1; } - Path p = new Path(getLocationUri(sto)); - return getPathSize(p, p.getFileSystem(conf)); - } - - private long getPathSize(Path storePath, FileSystem fs) throws IOException { long bytes = 0; - FileStatus[] lst = fs.listStatus(storePath); + Path p = new Path(getLocationUri(sto)); + FileSystem fs = p.getFileSystem(conf); + FileStatus[] lst = fs.listStatus(p); if (lst != null) { for (FileStatus status : lst) { - if (status.isFile()) { - if (status.getLen() > 0) - bytes += status.getLen(); - } - else { // recursively count nested leaves' (files) sizes - bytes += getPathSize(status.getPath(), fs); - } + bytes += status.getLen(); } } + return bytes; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Fri Feb 24 03:34:37 2017 @@ -92,7 +92,7 @@ public class InputSizeReducerEstimator i return reducers; } - public static long getTotalInputFileSize(Configuration conf, + static long getTotalInputFileSize(Configuration conf, List<POLoad> lds, Job job) throws IOException { return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE); } @@ -100,7 +100,7 @@ public class InputSizeReducerEstimator i /** * Get the input size for as many inputs as possible. Inputs that do not report * their size nor can pig look that up itself are excluded from this size. - * + * * @param conf Configuration * @param lds List of POLoads * @param job Job
