Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml (original) +++ pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml Wed Feb 22 09:43:41 2017 @@ -294,6 +294,144 @@ team_parkyearslist = FOREACH (GROUP team </section> </section> +<section id="bagtotuple"> + <title>BagToTuple</title> + <p>Un-nests the elements of a bag into a tuple.</p> + + <section> + <title>Syntax</title> + <table> + <tr> + <td> + <p>BagToTuple(expression)</p> + </td> + </tr> + </table></section> + + <section> + <title>Terms</title> + <table> + <tr> + <td> + <p>expression</p> + </td> + <td> + <p>An expression with data type bag.</p> + </td> + </tr> + </table> + </section> + + <section> + <title>Usage</title> + <p>BagToTuple creates a tuple from the elements of a bag. It removes only + the first level of nesting; it does not recursively un-nest nested bags. + Unlike FLATTEN, BagToTuple will not generate multiple output records per + input record. + </p> + </section> + <section> + <title>Examples</title> + <p>In this example, a bag containing tuples with one field is converted to a tuple.</p> +<source> +A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:chararray)}); + +DUMP A; +({('a'),('b'),('c')}) +({('d'),('e'),('f')}) + +X = FOREACH A GENERATE BagToTuple(B1); + +DUMP X; +(('a','b','c')) +(('d','e','f')) +</source> + <p>In this example, a bag containing tuples with two fields is converted to a tuple.</p> +<source> +A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:int,f2:int)}); + +DUMP A; +({(4,1),(7,8),(4,9)}) +({(5,8),(4,3),(3,8)}) + +X = FOREACH A GENERATE BagToTuple(B1); + +DUMP X; +((4,1,7,8,4,9)) +((5,8,4,3,3,8)) +</source> + </section> +</section> + +<section id="bagtotuple"> + <title>BagToTuple</title> + <p>Un-nests the elements of a bag into a tuple.</p> + + <section> + <title>Syntax</title> + <table> + <tr> + <td> + <p>BagToTuple(expression)</p> + </td> + </tr> + </table></section> + + <section> + <title>Terms</title> + <table> + <tr> + <td> + <p>expression</p> + </td> + <td> + <p>An expression with data type bag.</p> + </td> + </tr> + </table> + </section> + + <section> + <title>Usage</title> + <p>BagToTuple creates a tuple from the elements of a bag. It removes only + the first level of nesting; it does not recursively un-nest nested bags. + Unlike FLATTEN, BagToTuple will not generate multiple output records per + input record. + </p> + </section> + <section> + <title>Examples</title> + <p>In this example, a bag containing tuples with one field is converted to a tuple.</p> +<source> +A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:chararray)}); + +DUMP A; +({('a'),('b'),('c')}) +({('d'),('e'),('f')}) + +X = FOREACH A GENERATE BagToTuple(B1); + +DUMP X; +(('a','b','c')) +(('d','e','f')) +</source> + <p>In this example, a bag containing tuples with two fields is converted to a tuple.</p> +<source> +A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:int,f2:int)}); + +DUMP A; +({(4,1),(7,8),(4,9)}) +({(5,8),(4,3),(3,8)}) + +X = FOREACH A GENERATE BagToTuple(B1); + +DUMP X; +((4,1,7,8,4,9)) +((5,8,4,3,3,8)) +</source> + </section> +</section> + <section id="bloom"> <title>Bloom</title> <p>Bloom filters are a common way to select a limited set of records before @@ -563,7 +701,80 @@ DUMP X; </tr> </table> </section></section> - + + +<!-- ++++++++++++++++++++++++++++++++++++++++++++++ --> + <section id="in"> + <title>IN</title> + <p>IN operator allows you to easily test if an expression matches any value in a list of values. It is used to reduce the need for multiple OR conditions.</p> + + <section> + <title>Syntax</title> + <table> + <tr> + <td> + <p>IN (expression)</p> + </td> + </tr> + </table></section> + + <section> + <title>Terms</title> + <table> + <tr> + <td> + <p>expression</p> + </td> + <td> + <p>An expression with data types chararray, int, long, float, double, bigdecimal, biginteger or bytearray.</p> + </td> + </tr> + </table></section> + + <section> + <title>Usage</title> + <p>IN operator allows you to easily test if an expression matches any value in a list of values. It is used to help reduce the need for multiple OR conditions.</p> + </section> + + <section> + <title>Example</title> + <p>In this example we filter out ID 4 and 6.</p> +<source> +A = load 'data' using PigStorage(',') AS (id:int, first:chararray, last:chararray, gender:chararray); + +DUMP A; +(1,Christine,Romero,Female) +(2,Sara,Hansen,Female) +(3,Albert,Rogers,Male) +(4,Kimberly,Morrison,Female) +(5,Eugene,Baker,Male) +(6,Ann,Alexander,Female) +(7,Kathleen,Reed,Female) +(8,Todd,Scott,Male) +(9,Sharon,Mccoy,Female) +(10,Evelyn,Rice,Female) + +X = FILTER A BY id IN (4, 6); +DUMP X; +(4,Kimberly,Morrison,Female) +(6,Ann,Alexander,Female) +</source> + </section> + +<p>In this example, we're passing a BigInteger and using NOT operator, thereby negating the passed list of fields in the IN clause</p> +<source> +A = load 'data' using PigStorage(',') AS (id:biginteger, first:chararray, last:chararray, gender:chararray); +X = FILTER A BY NOT id IN (1, 3, 5, 7, 9); +DUMP X; + +(2,Sara,Hansen,Female) +(4,Kimberly,Morrison,Female) +(6,Ann,Alexander,Female) +(8,Todd,Scott,Male) +(10,Evelyn,Rice,Female) +</source> +</section> + <!-- ++++++++++++++++++++++++++++++++++++++++++++++ --> <section id="count-star"> <title>COUNT_STAR</title> @@ -1377,7 +1588,80 @@ DUMP X; </tr> </table> </section></section> - + + +<!-- ++++++++++++++++++++++++++++++++++++++++++++++ --> + <section id="in"> + <title>IN</title> + <p>IN operator allows you to easily test if an expression matches any value in a list of values. It is used to reduce the need for multiple OR conditions.</p> + + <section> + <title>Syntax</title> + <table> + <tr> + <td> + <p>IN (expression)</p> + </td> + </tr> + </table></section> + + <section> + <title>Terms</title> + <table> + <tr> + <td> + <p>expression</p> + </td> + <td> + <p>An expression with data types chararray, int, long, float, double, bigdecimal, biginteger or bytearray.</p> + </td> + </tr> + </table></section> + + <section> + <title>Usage</title> + <p>IN operator allows you to easily test if an expression matches any value in a list of values. It is used to help reduce the need for multiple OR conditions.</p> + </section> + + <section> + <title>Example</title> + <p>In this example we filter out ID 4 and 6.</p> +<source> +A = load 'data' using PigStorage(',') AS (id:int, first:chararray, last:chararray, gender:chararray); + +DUMP A; +(1,Christine,Romero,Female) +(2,Sara,Hansen,Female) +(3,Albert,Rogers,Male) +(4,Kimberly,Morrison,Female) +(5,Eugene,Baker,Male) +(6,Ann,Alexander,Female) +(7,Kathleen,Reed,Female) +(8,Todd,Scott,Male) +(9,Sharon,Mccoy,Female) +(10,Evelyn,Rice,Female) + +X = FILTER A BY id IN (4, 6); +DUMP X; +(4,Kimberly,Morrison,Female) +(6,Ann,Alexander,Female) +</source> + </section> + +<p>In this example, we're passing a BigInteger and using NOT operator, thereby negating the passed list of fields in the IN clause</p> +<source> +A = load 'data' using PigStorage(',') AS (id:biginteger, first:chararray, last:chararray, gender:chararray); +X = FILTER A BY NOT id IN (1, 3, 5, 7, 9); +DUMP X; + +(2,Sara,Hansen,Female) +(4,Kimberly,Morrison,Female) +(6,Ann,Alexander,Female) +(8,Todd,Scott,Male) +(10,Evelyn,Rice,Female) +</source> +</section> + <!-- ++++++++++++++++++++++++++++++++++++++++++++++ --> <section id="tokenize"> <title>TOKENIZE</title> @@ -1442,7 +1726,7 @@ DUMP X; <source> {code} A = LOAD 'data' AS (f1:chararray); -B = FOREACH A TOKENIZE (f1,'||'); +B = FOREACH A GENERATE TOKENIZE (f1,'||'); DUMP B; {code} </source>
Added: pig/branches/spark/src/docs/src/documentation/content/xdocs/v_editors.xml URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/v_editors.xml?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/src/docs/src/documentation/content/xdocs/v_editors.xml (added) +++ pig/branches/spark/src/docs/src/documentation/content/xdocs/v_editors.xml Wed Feb 22 09:43:41 2017 @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd"> + +<document> + <header> + <title>Visual Editors</title> + </header> + <body> + + <!-- ====================================================================== --> + <!-- v_editors--> + <section id="zeppelin"> + <title>Running Pig in Apache Zeppelin</title> + + <p><a href="https://zeppelin.apache.org/">Apache Zeppelin</a> is a web-based notebook that enables interactive data analytics. Pig is supported as a backend interpreter in Zeppelin starting from version 0.7. </p> + <p>User can do all the things in zeppelin as you do in grunt shell. Besides, you can take advantage of Zeppelin's visualization feature to visualize the Pig output. Here's 2 links for how to configure Pig in Zeppelin and how to run Pig script in Zeppelin.</p> + + <ul> + <li> + <a href="https://zeppelin.apache.org/docs/latest/interpreter/pig.html">https://zeppelin.apache.org/docs/latest/interpreter/pig.html</a> + </li> + <li> + <a href="https://cwiki.apache.org/confluence/display/ZEPPELIN/Running+Pig+in+Apache+Zeppelin">https://cwiki.apache.org/confluence/display/ZEPPELIN/Running+Pig+in+Apache+Zeppelin</a> + </li> + </ul> + + <p><strong>Screenshot of running Pig in Zeppelin</strong> </p> + <p><img alt="Pig in zeppelin" src="images/pig_zeppelin.png" title="Pig in zeppelin"></img></p> + + </section> + </body> +</document> \ No newline at end of file Modified: pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java (original) +++ pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java Wed Feb 22 09:43:41 2017 @@ -34,10 +34,10 @@ public class CounterBasedErrorHandler im public CounterBasedErrorHandler() { Configuration conf = UDFContext.getUDFContext().getJobConf(); - this.minErrors = conf.getLong(PigConfiguration.PIG_ERRORS_MIN_RECORDS, + this.minErrors = conf.getLong(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS, 0); this.errorThreshold = conf.getFloat( - PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT, 0.0f); + PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT, 0.0f); } @Override Modified: pig/branches/spark/src/org/apache/pig/EvalFunc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/EvalFunc.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/EvalFunc.java (original) +++ pig/branches/spark/src/org/apache/pig/EvalFunc.java Wed Feb 22 09:43:41 2017 @@ -369,4 +369,17 @@ public abstract class EvalFunc<T> { public void setEndOfAllInput(boolean endOfAllInput) { } + + /** + * This will be called on both the front end and the back + * end during execution. + * @return the {@link LoadCaster} associated with this eval. Returning null + * indicates that casts from bytearray will pick the one associated with the + * parameters when they all come from the same loadcaster type. + * @throws IOException if there is an exception during LoadCaster + */ + public LoadCaster getLoadCaster() throws IOException { + return null; + } + } Modified: pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java (original) +++ pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java Wed Feb 22 09:43:41 2017 @@ -22,6 +22,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; @@ -47,6 +48,7 @@ public class JVMReuseImpl { PigGenericMapReduce.staticDataCleanup(); PigStatusReporter.staticDataCleanup(); PigCombiner.Combine.staticDataCleanup(); + DistinctCombiner.Combine.staticDataCleanup(); String className = null; String msg = null; Modified: pig/branches/spark/src/org/apache/pig/LoadFunc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/LoadFunc.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/LoadFunc.java (original) +++ pig/branches/spark/src/org/apache/pig/LoadFunc.java Wed Feb 22 09:43:41 2017 @@ -108,7 +108,7 @@ public abstract class LoadFunc { public abstract InputFormat getInputFormat() throws IOException; /** - * This will be called on the front end during planning and not on the back + * This will be called on both the front end and the back * end during execution. * @return the {@link LoadCaster} associated with this loader. Returning null * indicates that casts from byte array are not supported for this loader. Modified: pig/branches/spark/src/org/apache/pig/Main.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/Main.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/Main.java (original) +++ pig/branches/spark/src/org/apache/pig/Main.java Wed Feb 22 09:43:41 2017 @@ -27,7 +27,6 @@ import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.io.Reader; import java.io.StringReader; import java.net.URL; @@ -45,9 +44,8 @@ import java.util.jar.Attributes; import java.util.jar.JarFile; import java.util.jar.Manifest; -import jline.ConsoleReader; -import jline.ConsoleReaderInputStream; -import jline.History; +import jline.console.ConsoleReader; +import jline.console.history.FileHistory; import org.antlr.runtime.RecognitionException; import org.apache.commons.logging.Log; @@ -59,6 +57,7 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import org.apache.pig.PigRunner.ReturnCode; +import org.apache.pig.backend.BackendException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.classification.InterfaceAudience; @@ -76,6 +75,7 @@ import org.apache.pig.parser.DryRunGrunt import org.apache.pig.scripting.ScriptEngine; import org.apache.pig.scripting.ScriptEngine.SupportedScriptLang; import org.apache.pig.tools.cmdline.CmdLineParser; +import org.apache.pig.tools.grunt.ConsoleReaderInputStream; import org.apache.pig.tools.grunt.Grunt; import org.apache.pig.tools.pigstats.PigProgressNotificationListener; import org.apache.pig.tools.pigstats.PigStats; @@ -100,13 +100,12 @@ import com.google.common.io.Closeables; public class Main { static { - Runtime.getRuntime().addShutdownHook(new Thread() { - + Utils.addShutdownHookWithPriority(new Runnable() { @Override public void run() { FileLocalizer.deleteTempResourceFiles(); } - }); + }, PigImplConstants.SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY); } private final static Log log = LogFactory.getLog(Main.class); @@ -477,7 +476,7 @@ public class Main { } - logFileName = validateLogFile(logFileName, file); + logFileName = validateLogFile(logFileName, localFileRet.file); pigContext.getProperties().setProperty("pig.logfile", (logFileName == null? "": logFileName)); // Set job name based on name of the script @@ -488,7 +487,7 @@ public class Main { new File(substFile).deleteOnExit(); } - scriptState.setScript(new File(file)); + scriptState.setScript(localFileRet.file); grunt = new Grunt(pin, pigContext); gruntCalled = true; @@ -551,12 +550,13 @@ public class Main { } // Interactive mode = ExecMode.SHELL; - //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available - ConsoleReader reader = new ConsoleReader(Utils.getCompositeStream(System.in, properties), new OutputStreamWriter(System.out)); - reader.setDefaultPrompt("grunt> "); + //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available + ConsoleReader reader = new ConsoleReader(Utils.getCompositeStream(System.in, properties), System.out); + reader.setExpandEvents(false); + reader.setPrompt("grunt> "); final String HISTORYFILE = ".pig_history"; String historyFile = System.getProperty("user.home") + File.separator + HISTORYFILE; - reader.setHistory(new History(new File(historyFile))); + reader.setHistory(new FileHistory(new File(historyFile))); ConsoleReaderInputStream inputStream = new ConsoleReaderInputStream(reader); grunt = new Grunt(new BufferedReader(new InputStreamReader(inputStream)), pigContext); grunt.setConsoleReader(reader); @@ -605,7 +605,7 @@ public class Main { return ReturnCode.SUCCESS; } - logFileName = validateLogFile(logFileName, remainders[0]); + logFileName = validateLogFile(logFileName, localFileRet.file); pigContext.getProperties().setProperty("pig.logfile", (logFileName == null? "": logFileName)); if (!debug) { @@ -660,6 +660,7 @@ public class Main { if(!gruntCalled) { LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched"); } + killRunningJobsIfInterrupted(e, pigContext); } catch (Throwable e) { rc = ReturnCode.THROWABLE_EXCEPTION; PigStatsUtil.setErrorMessage(e.getMessage()); @@ -668,6 +669,7 @@ public class Main { if(!gruntCalled) { LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched"); } + killRunningJobsIfInterrupted(e, pigContext); } finally { if (printScriptRunTime) { printScriptRunTime(startTime); @@ -694,6 +696,22 @@ public class Main { + " (" + duration.getMillis() + " ms)"); } + private static void killRunningJobsIfInterrupted(Throwable e, PigContext pigContext) { + Throwable cause = e.getCause(); + // Kill running job when we get InterruptedException + // Pig thread is interrupted by mapreduce when Oozie launcher job is killed + // Shutdown hook kills running jobs, but sometimes NodeManager can issue + // a SIGKILL after AM unregisters and before shutdown hook gets to execute + // causing orphaned jobs that continue to run. + if (e instanceof InterruptedException || (cause != null && cause instanceof InterruptedException)) { + try { + pigContext.getExecutionEngine().kill(); + } catch (BackendException be) { + log.error("Error while killing running jobs", be); + } + } + } + protected static PigProgressNotificationListener makeListener(Properties properties) { try { @@ -971,11 +989,10 @@ public class Main { System.out.println("Additionally, any Hadoop property can be specified."); } - private static String validateLogFile(String logFileName, String scriptName) { + private static String validateLogFile(String logFileName, File scriptFile) { String strippedDownScriptName = null; - if(scriptName != null) { - File scriptFile = new File(scriptName); + if (scriptFile != null) { if(!scriptFile.isDirectory()) { String scriptFileAbsPath; try { Added: pig/branches/spark/src/org/apache/pig/NonFSLoadFunc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/NonFSLoadFunc.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/NonFSLoadFunc.java (added) +++ pig/branches/spark/src/org/apache/pig/NonFSLoadFunc.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,25 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.pig; + +/** + * * Marker interface to distinguish LoadFunc implementations that don't use file system sources. + * */ +public interface NonFSLoadFunc { + +} Modified: pig/branches/spark/src/org/apache/pig/PigConfiguration.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigConfiguration.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/PigConfiguration.java (original) +++ pig/branches/spark/src/org/apache/pig/PigConfiguration.java Wed Feb 22 09:43:41 2017 @@ -18,6 +18,7 @@ package org.apache.pig; + /** * Container for static configuration strings, defaults, etc. This is intended just for keys that can * be set by users, not for keys that are generally used within pig. @@ -62,9 +63,15 @@ public class PigConfiguration { public static final String PIG_TEZ_OPT_UNION = "pig.tez.opt.union"; /** * These keys are used to enable or disable tez union optimization for - * specific StoreFuncs so that optimization is only applied to StoreFuncs - * that do not hard part file names and honor mapreduce.output.basename and - * is turned of for those that do not. Refer PIG-4649 + * specific StoreFuncs. Optimization should be turned off for those + * StoreFuncs that hard code part file names and do not prefix file names + * with mapreduce.output.basename configuration. + * + * If the StoreFuncs implement + * {@link StoreFunc#supportsParallelWriteToStoreLocation()} and return true + * or false then that is is used to turn on or off union optimization + * respectively. These settings can be used for StoreFuncs that have not + * implemented the API yet. */ public static final String PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS = "pig.tez.opt.union.supported.storefuncs"; public static final String PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS = "pig.tez.opt.union.unsupported.storefuncs"; @@ -127,6 +134,58 @@ public class PigConfiguration { public static final String PIG_SKEWEDJOIN_REDUCE_MEM = "pig.skewedjoin.reduce.mem"; /** + * Bloom join has two different kind of implementations. + * <ul> + * <li>map <br> + * In each map, bloom filters are computed on the join keys partitioned by + * the hashcode of the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of + * partitions. Bloom filters from different maps are then combined in the + * reducer producing one bloom filter per partition. This is efficient and + * fast if there are smaller number of maps (<10) and the number of + * distinct keys are not too high. It can be faster with larger number of + * maps and even with bigger bloom vector sizes, but the amount of data + * shuffled to the reducer for aggregation becomes huge making it + * inefficient.</li> + * <li>reduce <br> + * Join keys are sent from the map to the reducer partitioned by hashcode of + * the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of reducers. One + * bloom filter is then created per partition. This is efficient for larger + * datasets with lot of maps or very large + * {@link #PIG_BLOOMJOIN_VECTORSIZE_BYTES}. In this case size of keys sent + * to the reducer is smaller than sending bloom filters to reducer for + * aggregation making it efficient.</li> + * </ul> + * Default value is map. + */ + public static final String PIG_BLOOMJOIN_STRATEGY = "pig.bloomjoin.strategy"; + + /** + * The number of bloom filters that will be created. + * Default is 1 for map strategy and 11 for reduce strategy. + */ + public static final String PIG_BLOOMJOIN_NUM_FILTERS = "pig.bloomjoin.num.filters"; + + /** + * The size in bytes of the bit vector to be used for the bloom filter. + * A bigger vector size will be needed when the number of distinct keys is higher. + * Default value is 1048576 (1MB). + */ + public static final String PIG_BLOOMJOIN_VECTORSIZE_BYTES = "pig.bloomjoin.vectorsize.bytes"; + + /** + * The type of hash function to use. Valid values are jenkins and murmur. + * Default is murmur. + */ + public static final String PIG_BLOOMJOIN_HASH_TYPE = "pig.bloomjoin.hash.type"; + + /** + * The number of hash functions to be used in bloom computation. It determines the probability of false positives. + * Higher the number lower the false positives. Too high a value can increase the cpu time. + * Default value is 3. + */ + public static final String PIG_BLOOMJOIN_HASH_FUNCTIONS = "pig.bloomjoin.hash.functions"; + + /** * This key used to control the maximum size loaded into * the distributed cache when doing fragment-replicated join */ @@ -151,6 +210,12 @@ public class PigConfiguration { * This key is used to configure grace parallelism in tez. Default is true. */ public static final String PIG_TEZ_GRACE_PARALLELISM = "pig.tez.grace.parallelism"; + /** + * This key is used to turn off dag recovery if there is auto parallelism. + * Default is false. Useful when running with Tez versions before Tez 0.8 + * which have issues with auto parallelism during DAG recovery. + */ + public static final String PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY = "pig.tez.auto.parallelism.disable.dag.recovery"; /** * This key is used to configure compression for the pig input splits which @@ -324,17 +389,17 @@ public class PigConfiguration { /** * Boolean value used to enable or disable error handling for storers */ - public static final String PIG_ALLOW_STORE_ERRORS = "pig.allow.store.errors"; + public static final String PIG_ERROR_HANDLING_ENABLED = "pig.error-handling.enabled"; /** * Controls the minimum number of errors */ - public static final String PIG_ERRORS_MIN_RECORDS = "pig.errors.min.records"; + public static final String PIG_ERROR_HANDLING_MIN_ERROR_RECORDS = "pig.error-handling.min.error.records"; /** * Set the threshold for percentage of errors */ - public static final String PIG_ERROR_THRESHOLD_PERCENT = "pig.error.threshold.percent"; + public static final String PIG_ERROR_HANDLING_THRESHOLD_PERCENT = "pig.error-handling.error.threshold"; /** * Comma-delimited entries of commands/operators that must be disallowed. @@ -411,6 +476,31 @@ public class PigConfiguration { */ public static final String PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE = "pig.spill.unused.memory.threshold.size"; + /** + * Log tracing id that can be used by upstream clients for tracking respective logs + */ + public static final String PIG_LOG_TRACE_ID = "pig.log.trace.id"; + + /** + * @deprecated use {@link #PIG_LOG_TRACE_ID} instead. Will be removed in Pig 0.18 + */ + public static final String CALLER_ID = PIG_LOG_TRACE_ID; + + /** + * Enable ATS for Pig + */ + public static final String PIG_ATS_ENABLED = "pig.ats.enabled"; + + /** + * @deprecated use {@link #PIG_ATS_ENABLED} instead. Will be removed in Pig 0.18 + */ + public static final String ENABLE_ATS = PIG_ATS_ENABLED; + + /** + * If set, Pig will override tez.am.launch.cmd-opts and tez.am.resource.memory.mb to optimal + */ + public static final String PIG_TEZ_CONFIGURE_AM_MEMORY = "pig.tez.configure.am.memory"; + // Deprecated settings of Pig 0.13 /** Modified: pig/branches/spark/src/org/apache/pig/PigServer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigServer.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/PigServer.java (original) +++ pig/branches/spark/src/org/apache/pig/PigServer.java Wed Feb 22 09:43:41 2017 @@ -25,6 +25,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; import java.io.StringReader; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; import java.net.URL; import java.util.ArrayList; import java.util.Collection; @@ -53,6 +55,7 @@ import org.apache.pig.backend.datastorag import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.executionengine.ExecJob; import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS; +import org.apache.pig.backend.hadoop.PigATSClient; import org.apache.pig.backend.hadoop.executionengine.HJob; import org.apache.pig.builtin.PigStorage; import org.apache.pig.classification.InterfaceAudience; @@ -241,6 +244,54 @@ public class PigServer { } PigStats.start(pigContext.getExecutionEngine().instantiatePigStats()); + // log ATS event includes the caller context + String auditId = PigATSClient.getPigAuditId(pigContext); + String callerId = (String)pigContext.getProperties().get(PigConfiguration.PIG_LOG_TRACE_ID); + log.info("Pig Script ID for the session: " + auditId); + if (callerId != null) { + log.info("Caller ID for session: " + callerId); + } + if (Boolean.parseBoolean(pigContext.getProperties() + .getProperty(PigConfiguration.PIG_ATS_ENABLED))) { + if (Boolean.parseBoolean(pigContext.getProperties() + .getProperty("yarn.timeline-service.enabled", "false"))) { + PigATSClient.ATSEvent event = new PigATSClient.ATSEvent(auditId, callerId); + try { + PigATSClient.getInstance().logEvent(event); + } catch (Exception e) { + log.warn("Error posting to ATS: ", e); + } + } else { + log.warn("ATS is disabled since" + + " yarn.timeline-service.enabled set to false"); + } + + } + + // set hdfs caller context + Class callerContextClass = null; + try { + callerContextClass = Class.forName("org.apache.hadoop.ipc.CallerContext"); + } catch (ClassNotFoundException e) { + // If pre-Hadoop 2.8.0, skip setting CallerContext + } + if (callerContextClass != null) { + try { + // Reflection for the following code since it is only available since hadoop 2.8.0: + // CallerContext hdfsContext = new CallerContext.Builder(auditId).build(); + // CallerContext.setCurrent(hdfsContext); + Class callerContextBuilderClass = Class.forName("org.apache.hadoop.ipc.CallerContext$Builder"); + Constructor callerContextBuilderConstruct = callerContextBuilderClass.getConstructor(String.class); + Object builder = callerContextBuilderConstruct.newInstance(auditId); + Method builderBuildMethod = builder.getClass().getMethod("build"); + Object hdfsContext = builderBuildMethod.invoke(builder); + Method callerContextSetCurrentMethod = callerContextClass.getMethod("setCurrent", hdfsContext.getClass()); + callerContextSetCurrentMethod.invoke(callerContextClass, hdfsContext); + } catch (Exception e) { + // Shall not happen unless API change in future Hadoop commons + throw new ExecException(e); + } + } } private void addHadoopProperties() throws ExecException { @@ -612,7 +663,8 @@ public class PigServer { pigContext.scriptingUDFs.put(path, namespace); } - File f = FileLocalizer.fetchFile(pigContext.getProperties(), path).file; + FetchFileRet ret = FileLocalizer.fetchFile(pigContext.getProperties(), path); + File f = ret.file; if (!f.canRead()) { int errCode = 4002; String msg = "Can't read file: " + path; @@ -621,9 +673,19 @@ public class PigServer { } String cwd = new File(".").getCanonicalPath(); String filePath = f.getCanonicalPath(); - //Use the relative path in the jar, if the path specified is relative - String nameInJar = filePath.equals(cwd + File.separator + path) ? - filePath.substring(cwd.length() + 1) : filePath; + String nameInJar = filePath; + // Use the relative path in the jar, if the path specified is relative + if (!ret.didFetch) { + if (!new File(path).isAbsolute() && path.indexOf("." + File.separator) == -1) { + // In case of Oozie, the localized files are in a different + // directory symlinked to the current directory. Canonical path will not point to cwd. + nameInJar = path; + } else if (filePath.equals(cwd + File.separator + path)) { + // If user specified absolute path and it refers to cwd + nameInJar = filePath.substring(cwd.length() + 1); + } + } + pigContext.addScriptFile(nameInJar, filePath); if(scriptingLang != null) { ScriptEngine se = ScriptEngine.getInstance(scriptingLang); Modified: pig/branches/spark/src/org/apache/pig/StoreFunc.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/StoreFunc.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/StoreFunc.java (original) +++ pig/branches/spark/src/org/apache/pig/StoreFunc.java Wed Feb 22 09:43:41 2017 @@ -21,17 +21,14 @@ import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; - import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.UDFContext; -import org.apache.pig.tools.pigstats.PigStatusReporter; /** @@ -45,21 +42,21 @@ public abstract class StoreFunc implemen /** * This method is called by the Pig runtime in the front end to convert the * output location to an absolute path if the location is relative. The - * StoreFunc implementation is free to choose how it converts a relative + * StoreFunc implementation is free to choose how it converts a relative * location to an absolute location since this may depend on what the location - * string represent (hdfs path or some other data source). - * - * + * string represent (hdfs path or some other data source). + * + * * @param location location as provided in the "store" statement of the script * @param curDir the current working direction based on any "cd" statements * in the script before the "store" statement. If there are no "cd" statements - * in the script, this would be the home directory - + * in the script, this would be the home directory - * <pre>/user/<username> </pre> * @return the absolute location based on the arguments passed * @throws IOException if the conversion is not possible */ @Override - public String relToAbsPathForStoreLocation(String location, Path curDir) + public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { return LoadFunc.getAbsolutePath(location, curDir); } @@ -67,32 +64,34 @@ public abstract class StoreFunc implemen /** * Return the OutputFormat associated with StoreFunc. This will be called * on the front end during planning and on the backend during - * execution. + * execution. * @return the {@link OutputFormat} associated with StoreFunc - * @throws IOException if an exception occurs while constructing the + * @throws IOException if an exception occurs while constructing the * OutputFormat * */ + @Override public abstract OutputFormat getOutputFormat() throws IOException; /** - * Communicate to the storer the location where the data needs to be stored. - * The location string passed to the {@link StoreFunc} here is the - * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} + * Communicate to the storer the location where the data needs to be stored. + * The location string passed to the {@link StoreFunc} here is the + * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * This method will be called in the frontend and backend multiple times. Implementations * should bear in mind that this method is called multiple times and should * ensure there are no inconsistent side effects due to the multiple calls. * {@link #checkSchema(ResourceSchema)} will be called before any call to * {@link #setStoreLocation(String, Job)}. - * + * - * @param location Location returned by + * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} * @param job The {@link Job} object * @throws IOException if the location is not valid. */ + @Override public abstract void setStoreLocation(String location, Job job) throws IOException; - + /** * Set the schema for data to be stored. This will be called on the * front end during planning if the store is associated with a schema. @@ -117,21 +116,23 @@ public abstract class StoreFunc implemen * @param writer RecordWriter to use. * @throws IOException if an exception occurs during initialization */ + @Override public abstract void prepareToWrite(RecordWriter writer) throws IOException; /** * Write a tuple to the data store. - * + * * @param t the tuple to store. * @throws IOException if an exception occurs during the write */ + @Override public abstract void putNext(Tuple t) throws IOException; - + /** * This method will be called by Pig both in the front end and back end to * pass a unique signature to the {@link StoreFunc} which it can use to store * information in the {@link UDFContext} which it needs to store between - * various method invocations in the front end and back end. This method + * various method invocations in the front end and back end. This method * will be called before other methods in {@link StoreFunc}. This is necessary * because in a Pig Latin script with multiple stores, the different * instances of store functions need to be able to find their (and only their) @@ -142,21 +143,21 @@ public abstract class StoreFunc implemen public void setStoreFuncUDFContextSignature(String signature) { // default implementation is a no-op } - + /** * This method will be called by Pig if the job which contains this store * fails. Implementations can clean up output locations in this method to * ensure that no incorrect/incomplete results are left in the output location. * The default implementation deletes the output location if it * is a {@link FileSystem} location. - * @param location Location returned by + * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} - * @param job The {@link Job} object - this should be used only to obtain + * @param job The {@link Job} object - this should be used only to obtain * cluster properties through {@link Job#getConfiguration()} and not to set/query - * any runtime job information. + * any runtime job information. */ @Override - public void cleanupOnFailure(String location, Job job) + public void cleanupOnFailure(String location, Job job) throws IOException { cleanupOnFailureImpl(location, job); } @@ -166,19 +167,19 @@ public abstract class StoreFunc implemen * is successful, and some cleanup of intermediate resources is required. * Implementations can clean up output locations in this method to * ensure that no incorrect/incomplete results are left in the output location. - * @param location Location returned by + * @param location Location returned by * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} - * @param job The {@link Job} object - this should be used only to obtain + * @param job The {@link Job} object - this should be used only to obtain * cluster properties through {@link Job#getConfiguration()} and not to set/query - * any runtime job information. + * any runtime job information. */ @Override - public void cleanupOnSuccess(String location, Job job) + public void cleanupOnSuccess(String location, Job job) throws IOException { // DEFAULT: DO NOTHING, user-defined overrides can // call cleanupOnFailureImpl(location, job) or ...? } - + /** * Default implementation for {@link #cleanupOnFailure(String, Job)} * and {@link #cleanupOnSuccess(String, Job)}. This removes a file @@ -187,15 +188,56 @@ public abstract class StoreFunc implemen * @param job Hadoop job, used to access the appropriate file system. * @throws IOException */ - public static void cleanupOnFailureImpl(String location, Job job) - throws IOException { + public static void cleanupOnFailureImpl(String location, Job job) + throws IOException { Path path = new Path(location); FileSystem fs = path.getFileSystem(job.getConfiguration()); if(fs.exists(path)){ fs.delete(path, true); - } + } + } + + // TODO When dropping support for JDK 7 move this as a default method to StoreFuncInterface + /** + * DAG execution engines like Tez support optimizing union by writing to + * output location in parallel from tasks of different vertices. Commit is + * called once all the vertices in the union are complete. This eliminates + * need to have a separate phase to read data output from previous phases, + * union them and write out again. + * + * Enabling the union optimization requires the OutputFormat to + * + * 1) Support creation of different part file names for tasks of different + * vertices. Conflicting filenames can create data corruption and loss. + * For eg: If task 0 of vertex1 and vertex2 both create filename as + * part-r-00000, then one of the files will be overwritten when promoting + * from temporary to final location leading to data loss. + * FileOutputFormat has mapreduce.output.basename config which enables + * naming files differently in different vertices. Classes extending + * FileOutputFormat and those prefixing file names with mapreduce.output.basename + * value will not encounter conflict. Cases like HBaseStorage which write to key + * value store and do not produce files also should not face any conflict. + * + * 2) Support calling of commit once at the end takes care of promoting + * temporary files of the different vertices into the final location. + * For eg: FileOutputFormat commit algorithm handles promoting of files produced + * by tasks of different vertices into final output location without issues + * if there is no file name conflict. In cases like HBaseStorage, the + * TableOutputCommitter does nothing on commit. + * + * If custom OutputFormat used by the StoreFunc does not support the above + * two criteria, then false should be returned. Union optimization will be + * disabled for the StoreFunc. + * + * Default implementation returns null and in that case planner falls back + * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and + * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS} + * settings to determine if the StoreFunc supports it. + */ + public Boolean supportsParallelWriteToStoreLocation() { + return null; } - + /** * Issue a warning. Warning messages are aggregated and reported to * the user. Modified: pig/branches/spark/src/org/apache/pig/StreamToPig.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/StreamToPig.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/StreamToPig.java (original) +++ pig/branches/spark/src/org/apache/pig/StreamToPig.java Wed Feb 22 09:43:41 2017 @@ -57,7 +57,7 @@ public interface StreamToPig { public Tuple deserialize(byte[] bytes) throws IOException; /** - * This will be called on the front end during planning and not on the back + * This will be called on both the front end and the back * end during execution. * * @return the {@link LoadCaster} associated with this object, or null if Modified: pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Wed Feb 22 09:43:41 2017 @@ -183,6 +183,14 @@ public interface ExecutionEngine { public ExecutableManager getExecutableManager(); /** + * This method is called when user requests to kill all jobs + * associated with the execution engine + * + * @throws BackendException + */ + public void kill() throws BackendException; + + /** * This method is called when a user requests to kill a job associated with * the given job id. If it is not possible for a user to kill a job, throw a * exception. It is imperative for the job id's being displayed to be unique Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop; + +import java.io.File; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; +import org.apache.pig.impl.util.Utils; +import org.apache.pig.tools.pigstats.ScriptState; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class PigATSClient { + public static class ATSEvent { + public ATSEvent(String pigAuditId, String callerId) { + this.pigScriptId = pigAuditId; + this.callerId = callerId; + } + String callerId; + String pigScriptId; + } + public static final String ENTITY_TYPE = "PIG_SCRIPT_ID"; + public static final String ENTITY_CALLERID = "callerId"; + public static final String CALLER_CONTEXT = "PIG"; + public static final int AUDIT_ID_MAX_LENGTH = 128; + + private static final Log log = LogFactory.getLog(PigATSClient.class.getName()); + private static PigATSClient instance; + private static ExecutorService executor; + private TimelineClient timelineClient; + + public static synchronized PigATSClient getInstance() { + if (instance==null) { + instance = new PigATSClient(); + } + return instance; + } + + private PigATSClient() { + if (executor == null) { + executor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build()); + YarnConfiguration yarnConf = new YarnConfiguration(); + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(yarnConf); + timelineClient.start(); + } + Utils.addShutdownHookWithPriority(new Runnable() { + @Override + public void run() { + timelineClient.stop(); + executor.shutdownNow(); + executor = null; + } + }, PigImplConstants.SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY); + log.info("Created ATS Hook"); + } + + public static String getPigAuditId(PigContext context) { + String auditId; + if (context.getProperties().get(PigImplConstants.PIG_AUDIT_ID) != null) { + auditId = (String)context.getProperties().get(PigImplConstants.PIG_AUDIT_ID); + } else { + ScriptState ss = ScriptState.get(); + String filename = ss.getFileName().isEmpty()?"default" : new File(ss.getFileName()).getName(); + auditId = CALLER_CONTEXT + "-" + filename + "-" + ss.getId(); + } + return auditId.substring(0, Math.min(auditId.length(), AUDIT_ID_MAX_LENGTH)); + } + + synchronized public void logEvent(final ATSEvent event) { + executor.submit(new Runnable() { + @Override + public void run() { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(event.pigScriptId); + entity.setEntityType(ENTITY_TYPE); + entity.addPrimaryFilter(ENTITY_CALLERID, event.callerId!=null?event.callerId : "default"); + try { + timelineClient.putEntities(entity); + } catch (Exception e) { + log.info("Failed to submit plan to ATS: " + e.getMessage()); + } + } + }); + } +} Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java?rev=1783988&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java Wed Feb 22 09:43:41 2017 @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Iterator; +import java.util.LinkedList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.jobcontrol.JobControl; +import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; +import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State; + +/** + * extends the hadoop JobControl to remove the hardcoded sleep(5000) + * as most of this is private we have to use reflection + * + * See {@link https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java } + * + */ +public class PigJobControl extends JobControl { + private static final Log log = LogFactory.getLog(PigJobControl.class); + + private static Field runnerState; + private static Field jobsInProgress; + private static Field successfulJobs; + private static Field failedJobs; + + private static Method failAllJobs; + + private static Method checkState; + private static Method submit; + + private static boolean initSuccesful; + + static { + try { + + runnerState = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("runnerState"); + runnerState.setAccessible(true); + jobsInProgress = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("jobsInProgress"); + jobsInProgress.setAccessible(true); + successfulJobs = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("successfulJobs"); + successfulJobs.setAccessible(true); + failedJobs = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("failedJobs"); + failedJobs.setAccessible(true); + + failAllJobs = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredMethod("failAllJobs", Throwable.class); + failAllJobs.setAccessible(true); + + checkState = ControlledJob.class.getDeclaredMethod("checkState"); + checkState.setAccessible(true); + submit = ControlledJob.class.getDeclaredMethod("submit"); + submit.setAccessible(true); + + initSuccesful = true; + } catch (Exception e) { + log.debug("falling back to default JobControl (not using hadoop 0.23 ?)", e); + initSuccesful = false; + } + } + + protected int timeToSleep; + + /** + * Construct a job control for a group of jobs. + * @param groupName a name identifying this group + * @param pigContext + * @param conf + */ + public PigJobControl(String groupName, int timeToSleep) { + super(groupName); + this.timeToSleep = timeToSleep; + } + + public int getTimeToSleep() { + return timeToSleep; + } + + public void setTimeToSleep(int timeToSleep) { + this.timeToSleep = timeToSleep; + } + + private void setRunnerState(ThreadState state) { + try { + runnerState.set(this, state); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + private ThreadState getRunnerState() { + try { + return (ThreadState)runnerState.get(this); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private State checkState(ControlledJob j) { + try { + return (State)checkState.invoke(j); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private State submit(ControlledJob j) { + try { + return (State)submit.invoke(j); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("unchecked") + private LinkedList<ControlledJob> getJobs(Field field) { + try { + return (LinkedList<ControlledJob>)field.get(this); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void failAllJobs(Throwable t) { + try { + failAllJobs.invoke(this, t); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * The main loop for the thread. + * The loop does the following: + * Check the states of the running jobs + * Update the states of waiting jobs + * Submit the jobs in ready state + */ + public void run() { + if (!initSuccesful) { + super.run(); + return; + } + try { + setRunnerState(ThreadState.RUNNING); + while (true) { + while (getRunnerState() == ThreadState.SUSPENDED) { + try { + Thread.sleep(timeToSleep); + } + catch (Exception e) { + //TODO the thread was interrupted, do something!!! + } + } + + synchronized(this) { + Iterator<ControlledJob> it = getJobs(jobsInProgress).iterator(); + if (!it.hasNext()) { + stop(); + } + while(it.hasNext()) { + ControlledJob j = it.next(); + + // TODO: Need to re-visit the following try...catch + // when Pig picks up a Hadoop release with MAPREDUCE-6762 applied + // as its dependency. + try { + log.debug("Checking state of job " + j); + } catch(NullPointerException npe) { + log.warn("Failed to get job name " + + "when checking state of job. " + + "Check if job status is null.", npe); + } + + switch(checkState(j)) { + case SUCCESS: + getJobs(successfulJobs).add(j); + it.remove(); + break; + case FAILED: + case DEPENDENT_FAILED: + getJobs(failedJobs).add(j); + it.remove(); + break; + case READY: + submit(j); + break; + case RUNNING: + case WAITING: + //Do Nothing + break; + } + } + } + + if (getRunnerState() != ThreadState.RUNNING && + getRunnerState() != ThreadState.SUSPENDED) { + break; + } + try { + Thread.sleep(timeToSleep); + } + catch (Exception e) { + //TODO the thread was interrupted, do something!!! + } + if (getRunnerState() != ThreadState.RUNNING && + getRunnerState() != ThreadState.SUSPENDED) { + break; + } + } + }catch(Throwable t) { + log.error("Error while trying to run jobs.",t); + //Mark all jobs as failed because we got something bad. + failAllJobs(t); + } + setRunnerState(ThreadState.STOPPED); + } + + +} Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java Wed Feb 22 09:43:41 2017 @@ -17,8 +17,6 @@ package org.apache.pig.backend.hadoop.accumulo; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.math.BigDecimal; import java.math.BigInteger; import java.util.Collection; @@ -303,24 +301,8 @@ public abstract class AbstractAccumuloSt */ protected void simpleUnset(Configuration conf, Map<String, String> entriesToUnset) { - try { - Method unset = conf.getClass().getMethod("unset", String.class); - - for (String key : entriesToUnset.keySet()) { - unset.invoke(conf, key); - } - } catch (NoSuchMethodException e) { - log.error("Could not invoke Configuration.unset method", e); - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - log.error("Could not invoke Configuration.unset method", e); - throw new RuntimeException(e); - } catch (IllegalArgumentException e) { - log.error("Could not invoke Configuration.unset method", e); - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - log.error("Could not invoke Configuration.unset method", e); - throw new RuntimeException(e); + for (String key : entriesToUnset.keySet()) { + conf.unset(key); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java Wed Feb 22 09:43:41 2017 @@ -22,8 +22,6 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.net.URL; import java.net.URLDecoder; import java.text.MessageFormat; @@ -42,6 +40,7 @@ import java.util.zip.ZipOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Logger; @@ -112,7 +111,7 @@ public class Utils { // attempt to locate an existing jar for the class. String jar = findContainingJar(my_class, packagedClasses); if (null == jar || jar.isEmpty()) { - jar = getJar(my_class); + jar = JarFinder.getJar(my_class); updateMap(jar, packagedClasses); } @@ -200,41 +199,6 @@ public class Utils { } /** - * Invoke 'getJar' on a JarFinder implementation. Useful for some job - * configuration contexts (HBASE-8140) and also for testing on MRv2. First - * check if we have HADOOP-9426. Lacking that, fall back to the backport. - * - * @param my_class - * the class to find. - * @return a jar file that contains the class, or null. - */ - private static String getJar(Class<?> my_class) { - String ret = null; - String hadoopJarFinder = "org.apache.hadoop.util.JarFinder"; - Class<?> jarFinder = null; - try { - log.debug("Looking for " + hadoopJarFinder + "."); - jarFinder = Class.forName(hadoopJarFinder); - log.debug(hadoopJarFinder + " found."); - Method getJar = jarFinder.getMethod("getJar", Class.class); - ret = (String) getJar.invoke(null, my_class); - } catch (ClassNotFoundException e) { - log.debug("Using backported JarFinder."); - ret = jarFinderGetJar(my_class); - } catch (InvocationTargetException e) { - // function was properly called, but threw it's own exception. - // Unwrap it - // and pass it on. - throw new RuntimeException(e.getCause()); - } catch (Exception e) { - // toss all other exceptions, related to reflection failure - throw new RuntimeException("getJar invocation failed.", e); - } - - return ret; - } - - /** * Returns the full path to the Jar containing the class. It always return a * JAR. * Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java Wed Feb 22 09:43:41 2017 @@ -29,7 +29,6 @@ import org.apache.pig.ExecType; import org.apache.pig.PigConstants; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; public class ConfigurationUtil { @@ -89,7 +88,7 @@ public class ConfigurationUtil { // so build/classes/hadoop-site.xml contains such entry. This prevents some tests from // successful (They expect those files in hdfs), so we need to unset it in hadoop 23. // This should go away once MiniMRCluster fix the distributed cache issue. - HadoopShims.unsetConf(localConf, MRConfiguration.JOB_CACHE_FILES); + localConf.unset(MRConfiguration.JOB_CACHE_FILES); } localConf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); Properties props = ConfigurationUtil.toProperties(localConf); @@ -106,4 +105,14 @@ public class ConfigurationUtil { } } } + + /** + * Returns Properties containing alternative names of given property and same values - can be used to solve deprecations + * @return + */ + public static Properties expandForAlternativeNames(String name, String value){ + final Configuration config = new Configuration(false); + config.set(name,value); + return ConfigurationUtil.toProperties(config); + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java Wed Feb 22 09:43:41 2017 @@ -18,20 +18,20 @@ package org.apache.pig.backend.hadoop.datastorage; -import java.net.URI; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; -import java.util.List; -import java.util.Properties; import java.util.Enumeration; -import java.util.Map; import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.conf.Configuration; import org.apache.pig.PigException; import org.apache.pig.backend.datastorage.ContainerDescriptor; import org.apache.pig.backend.datastorage.DataStorage; @@ -40,8 +40,6 @@ import org.apache.pig.backend.datastorag public class HDataStorage implements DataStorage { - private static final String FILE_SYSTEM_LOCATION = "fs.default.name"; - private FileSystem fs; private Configuration configuration; private Properties properties; @@ -58,9 +56,10 @@ public class HDataStorage implements Dat init(); } + @Override public void init() { // check if name node is set, if not we set local as fail back - String nameNode = this.properties.getProperty(FILE_SYSTEM_LOCATION); + String nameNode = this.properties.getProperty(FileSystem.FS_DEFAULT_NAME_KEY); if (nameNode == null || nameNode.length() == 0) { nameNode = "local"; } @@ -76,14 +75,17 @@ public class HDataStorage implements Dat } } + @Override public void close() throws IOException { fs.close(); } - + + @Override public Properties getConfiguration() { return this.properties; } + @Override public void updateConfiguration(Properties newConfiguration) throws DataStorageException { // TODO sgroschupf 25Feb2008 this method is never called and @@ -92,38 +94,40 @@ public class HDataStorage implements Dat if (newConfiguration == null) { return; } - + Enumeration<Object> newKeys = newConfiguration.keys(); - + while (newKeys.hasMoreElements()) { String key = (String) newKeys.nextElement(); String value = null; - + value = newConfiguration.getProperty(key); - + fs.getConf().set(key,value); } } - + + @Override public Map<String, Object> getStatistics() throws IOException { Map<String, Object> stats = new HashMap<String, Object>(); long usedBytes = fs.getUsed(); stats.put(USED_BYTES_KEY , Long.valueOf(usedBytes).toString()); - + if (fs instanceof DistributedFileSystem) { DistributedFileSystem dfs = (DistributedFileSystem) fs; - + long rawCapacityBytes = dfs.getRawCapacity(); stats.put(RAW_CAPACITY_KEY, Long.valueOf(rawCapacityBytes).toString()); - + long rawUsedBytes = dfs.getRawUsed(); stats.put(RAW_USED_KEY, Long.valueOf(rawUsedBytes).toString()); } - + return stats; } - + + @Override public ElementDescriptor asElement(String name) throws DataStorageException { if (this.isContainer(name)) { return new HDirectory(this, name); @@ -132,70 +136,82 @@ public class HDataStorage implements Dat return new HFile(this, name); } } - + + @Override public ElementDescriptor asElement(ElementDescriptor element) throws DataStorageException { return asElement(element.toString()); } - + + @Override public ElementDescriptor asElement(String parent, - String child) + String child) throws DataStorageException { return asElement((new Path(parent, child)).toString()); } + @Override public ElementDescriptor asElement(ContainerDescriptor parent, - String child) + String child) throws DataStorageException { return asElement(parent.toString(), child); } + @Override public ElementDescriptor asElement(ContainerDescriptor parent, - ElementDescriptor child) + ElementDescriptor child) throws DataStorageException { return asElement(parent.toString(), child.toString()); } - public ContainerDescriptor asContainer(String name) + @Override + public ContainerDescriptor asContainer(String name) throws DataStorageException { return new HDirectory(this, name); } - + + @Override public ContainerDescriptor asContainer(ContainerDescriptor container) throws DataStorageException { return new HDirectory(this, container.toString()); } - + + @Override public ContainerDescriptor asContainer(String parent, - String child) + String child) throws DataStorageException { return new HDirectory(this, parent, child); } + @Override public ContainerDescriptor asContainer(ContainerDescriptor parent, - String child) + String child) throws DataStorageException { return new HDirectory(this, parent.toString(), child); } - + + @Override public ContainerDescriptor asContainer(ContainerDescriptor parent, ContainerDescriptor child) throws DataStorageException { return new HDirectory(this, parent.toString(), child.toString()); } - + + @Override public void setActiveContainer(ContainerDescriptor container) { fs.setWorkingDirectory(new Path(container.toString())); } - + + @Override public ContainerDescriptor getActiveContainer() { return new HDirectory(this, fs.getWorkingDirectory()); } + @Override public boolean isContainer(String name) throws DataStorageException { boolean isContainer = false; Path path = new Path(name); - + try { if ((this.fs.exists(path)) && (! this.fs.isFile(path))) { isContainer = true; @@ -206,10 +222,11 @@ public class HDataStorage implements Dat String msg = "Unable to check name " + name; throw new DataStorageException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e); } - + return isContainer; } - + + @Override public HPath[] asCollection(String pattern) throws DataStorageException { try { FileStatus[] paths = this.fs.globStatus(new Path(pattern)); @@ -218,7 +235,7 @@ public class HDataStorage implements Dat return new HPath[0]; List<HPath> hpaths = new ArrayList<HPath>(); - + for (int i = 0; i < paths.length; ++i) { HPath hpath = (HPath)this.asElement(paths[i].getPath().toString()); if (!hpath.systemElement()) { @@ -233,7 +250,7 @@ public class HDataStorage implements Dat throw new DataStorageException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e); } } - + public FileSystem getHFS() { return fs; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Feb 22 09:43:41 2017 @@ -30,6 +30,7 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.mapred.JobConf; import org.apache.pig.PigException; @@ -76,8 +77,6 @@ public abstract class HExecutionEngine i public static final String MAPRED_DEFAULT_SITE = "mapred-default.xml"; public static final String YARN_DEFAULT_SITE = "yarn-default.xml"; - public static final String FILE_SYSTEM_LOCATION = "fs.default.name"; - public static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = "fs.defaultFS"; public static final String LOCAL = "local"; protected PigContext pigContext; @@ -203,8 +202,8 @@ public abstract class HExecutionEngine i properties.setProperty(MRConfiguration.FRAMEWORK_NAME, LOCAL); } properties.setProperty(MRConfiguration.JOB_TRACKER, LOCAL); - properties.setProperty(FILE_SYSTEM_LOCATION, "file:///"); - properties.setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, "file:///"); + properties.remove("fs.default.name"); //Deprecated in Hadoop 2.x + properties.setProperty(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); jc = getLocalConf(); JobConf s3Jc = getS3Conf(); @@ -220,24 +219,7 @@ public abstract class HExecutionEngine i HKerberos.tryKerberosKeytabLogin(jc); cluster = jc.get(MRConfiguration.JOB_TRACKER); - nameNode = jc.get(FILE_SYSTEM_LOCATION); - if (nameNode == null) { - nameNode = (String) pigContext.getProperties().get(ALTERNATIVE_FILE_SYSTEM_LOCATION); - } - - if (cluster != null && cluster.length() > 0) { - if (!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) { - cluster = cluster + ":50020"; - } - properties.setProperty(MRConfiguration.JOB_TRACKER, cluster); - } - - if (nameNode != null && nameNode.length() > 0) { - if (!nameNode.contains(":") && !nameNode.equalsIgnoreCase(LOCAL)) { - nameNode = nameNode + ":8020"; - } - properties.setProperty(FILE_SYSTEM_LOCATION, nameNode); - } + nameNode = jc.get(FileSystem.FS_DEFAULT_NAME_KEY); LOG.info("Connecting to hadoop file system at: " + (nameNode == null ? LOCAL : nameNode)); @@ -369,7 +351,11 @@ public abstract class HExecutionEngine i @Override public void setProperty(String property, String value) { Properties properties = pigContext.getProperties(); - properties.put(property, value); + if (Configuration.isDeprecated(property)) { + properties.putAll(ConfigurationUtil.expandForAlternativeNames(property, value)); + } else { + properties.put(property, value); + } } @Override @@ -378,6 +364,13 @@ public abstract class HExecutionEngine i } @Override + public void kill() throws BackendException { + if (launcher != null) { + launcher.kill(); + } + } + + @Override public void killJob(String jobID) throws BackendException { if (launcher != null) { launcher.killJob(jobID, getJobConf()); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Wed Feb 22 09:43:41 2017 @@ -40,7 +40,7 @@ import org.apache.pig.tools.pigstats.Pig public class HJob implements ExecJob { private final Log log = LogFactory.getLog(getClass()); - + protected JOB_STATUS status; protected PigContext pigContext; protected FileSpec outFileSpec; @@ -48,7 +48,7 @@ public class HJob implements ExecJob { protected String alias; protected POStore poStore; private PigStats stats; - + public HJob(JOB_STATUS status, PigContext pigContext, POStore store, @@ -59,7 +59,7 @@ public class HJob implements ExecJob { this.outFileSpec = poStore.getSFile(); this.alias = alias; } - + public HJob(JOB_STATUS status, PigContext pigContext, POStore store, @@ -72,37 +72,41 @@ public class HJob implements ExecJob { this.alias = alias; this.stats = stats; } - + + @Override public JOB_STATUS getStatus() { return status; } - + + @Override public boolean hasCompleted() throws ExecException { return true; } - + + @Override public Iterator<Tuple> getResults() throws ExecException { final LoadFunc p; - + try{ - LoadFunc originalLoadFunc = + LoadFunc originalLoadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec( outFileSpec.getFuncSpec()); - - p = (LoadFunc) new ReadToEndLoader(originalLoadFunc, + + p = (LoadFunc) new ReadToEndLoader(originalLoadFunc, ConfigurationUtil.toConfiguration( - pigContext.getProperties()), outFileSpec.getFileName(), 0, pigContext); + pigContext.getProperties()), outFileSpec.getFileName(), 0); }catch (Exception e){ int errCode = 2088; String msg = "Unable to get results for: " + outFileSpec; throw new ExecException(msg, errCode, PigException.BUG, e); } - + return new Iterator<Tuple>() { Tuple t; boolean atEnd; + @Override public boolean hasNext() { if (atEnd) return false; @@ -120,6 +124,7 @@ public class HJob implements ExecJob { return !atEnd; } + @Override public Tuple next() { Tuple next = t; if (next != null) { @@ -136,6 +141,7 @@ public class HJob implements ExecJob { return next; } + @Override public void remove() { throw new RuntimeException("Removal not supported"); } @@ -143,31 +149,38 @@ public class HJob implements ExecJob { }; } + @Override public Properties getConfiguration() { return pigContext.getProperties(); } + @Override public PigStats getStatistics() { //throw new UnsupportedOperationException(); return stats; } + @Override public void completionNotification(Object cookie) { throw new UnsupportedOperationException(); } - + + @Override public void kill() throws ExecException { throw new UnsupportedOperationException(); } - + + @Override public void getLogs(OutputStream log) throws ExecException { throw new UnsupportedOperationException(); } - + + @Override public void getSTDOut(OutputStream out) throws ExecException { throw new UnsupportedOperationException(); } - + + @Override public void getSTDError(OutputStream error) throws ExecException { throw new UnsupportedOperationException(); } @@ -176,6 +189,7 @@ public class HJob implements ExecJob { backendException = e; } + @Override public Exception getException() { return backendException; }