Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml 
(original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml Fri 
Feb 24 03:34:37 2017
@@ -294,144 +294,6 @@ team_parkyearslist = FOREACH (GROUP team
   </section>
 </section>
 
-<section id="bagtotuple">
-  <title>BagToTuple</title>
-  <p>Un-nests the elements of a bag into a tuple.</p>
-
-  <section>
-    <title>Syntax</title>
-    <table>
-      <tr>
-        <td>
-          <p>BagToTuple(expression)</p>
-        </td>
-      </tr>
-  </table></section>
-
-  <section>
-    <title>Terms</title>
-    <table>
-      <tr>
-        <td>
-         <p>expression</p>
-        </td>
-        <td>
-         <p>An expression with data type bag.</p>
-        </td>
-      </tr> 
-    </table>
-  </section>
-
-  <section>
-    <title>Usage</title>
-    <p>BagToTuple creates a tuple from the elements of a bag. It removes only
-      the first level of nesting; it does not recursively un-nest nested bags.
-      Unlike FLATTEN, BagToTuple will not generate multiple output records per
-      input record.
-    </p>
-  </section>
-  <section>
-    <title>Examples</title>
-    <p>In this example, a bag containing tuples with one field is converted to 
a tuple.</p>
-<source>
-A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:chararray)});
-
-DUMP A;
-({('a'),('b'),('c')})
-({('d'),('e'),('f')})
-
-X = FOREACH A GENERATE BagToTuple(B1);
-
-DUMP X;
-(('a','b','c'))
-(('d','e','f'))
-</source>
-    <p>In this example, a bag containing tuples with two fields is converted 
to a tuple.</p>
-<source>
-A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:int,f2:int)});
-
-DUMP A;
-({(4,1),(7,8),(4,9)})
-({(5,8),(4,3),(3,8)})
-
-X = FOREACH A GENERATE BagToTuple(B1);
-
-DUMP X;
-((4,1,7,8,4,9))
-((5,8,4,3,3,8))
-</source>
-  </section>
-</section>
-
-<section id="bagtotuple">
-  <title>BagToTuple</title>
-  <p>Un-nests the elements of a bag into a tuple.</p>
-
-  <section>
-    <title>Syntax</title>
-    <table>
-      <tr>
-        <td>
-          <p>BagToTuple(expression)</p>
-        </td>
-      </tr>
-  </table></section>
-
-  <section>
-    <title>Terms</title>
-    <table>
-      <tr>
-        <td>
-         <p>expression</p>
-        </td>
-        <td>
-         <p>An expression with data type bag.</p>
-        </td>
-      </tr> 
-    </table>
-  </section>
-
-  <section>
-    <title>Usage</title>
-    <p>BagToTuple creates a tuple from the elements of a bag. It removes only
-      the first level of nesting; it does not recursively un-nest nested bags.
-      Unlike FLATTEN, BagToTuple will not generate multiple output records per
-      input record.
-    </p>
-  </section>
-  <section>
-    <title>Examples</title>
-    <p>In this example, a bag containing tuples with one field is converted to 
a tuple.</p>
-<source>
-A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:chararray)});
-
-DUMP A;
-({('a'),('b'),('c')})
-({('d'),('e'),('f')})
-
-X = FOREACH A GENERATE BagToTuple(B1);
-
-DUMP X;
-(('a','b','c'))
-(('d','e','f'))
-</source>
-    <p>In this example, a bag containing tuples with two fields is converted 
to a tuple.</p>
-<source>
-A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:int,f2:int)});
-
-DUMP A;
-({(4,1),(7,8),(4,9)})
-({(5,8),(4,3),(3,8)})
-
-X = FOREACH A GENERATE BagToTuple(B1);
-
-DUMP X;
-((4,1,7,8,4,9))
-((5,8,4,3,3,8))
-</source>
-  </section>
-</section>
-
 <section id="bloom">
   <title>Bloom</title>
   <p>Bloom filters are a common way to select a limited set of records before
@@ -701,80 +563,7 @@ DUMP X;
          </tr> 
    </table>
    </section></section>
-  
-
-<!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
- <section id="in">
- <title>IN</title>
- <p>IN operator allows you to easily test if an expression matches any value 
in a list of values. It is used to reduce the need for multiple OR 
conditions.</p>
-
- <section>
- <title>Syntax</title>
- <table>
-     <tr>
-          <td>
-             <p>IN (expression)</p>
-          </td>
-       </tr>
- </table></section>
-
- <section>
- <title>Terms</title>
- <table>
-     <tr>
-          <td>
-             <p>expression</p>
-          </td>
-          <td>
-             <p>An expression with data types chararray, int, long, float, 
double, bigdecimal, biginteger or bytearray.</p>
-          </td>
-       </tr>
- </table></section>
-
- <section>
- <title>Usage</title>
- <p>IN operator allows you to easily test if an expression matches any value 
in a list of values. It is used to help reduce the need for multiple OR 
conditions.</p>
- </section>
-
- <section>
- <title>Example</title>
- <p>In this example we filter out ID 4 and 6.</p>
-<source>
-A = load 'data' using PigStorage(',') AS (id:int, first:chararray, 
last:chararray, gender:chararray);
-
-DUMP A;
-(1,Christine,Romero,Female)
-(2,Sara,Hansen,Female)
-(3,Albert,Rogers,Male)
-(4,Kimberly,Morrison,Female)
-(5,Eugene,Baker,Male)
-(6,Ann,Alexander,Female)
-(7,Kathleen,Reed,Female)
-(8,Todd,Scott,Male)
-(9,Sharon,Mccoy,Female)
-(10,Evelyn,Rice,Female)
-
-X = FILTER A BY id IN (4, 6);
-DUMP X;
-(4,Kimberly,Morrison,Female)
-(6,Ann,Alexander,Female)
-</source>
- </section>
-
-<p>In this example, we're passing a BigInteger and using NOT operator, thereby 
negating the passed list of fields in the IN clause</p>
-<source>
-A = load 'data' using PigStorage(',') AS (id:biginteger, first:chararray, 
last:chararray, gender:chararray); 
-X = FILTER A BY NOT id IN (1, 3, 5, 7, 9); 
-DUMP X;
- 
-(2,Sara,Hansen,Female)
-(4,Kimberly,Morrison,Female)
-(6,Ann,Alexander,Female)
-(8,Todd,Scott,Male)
-(10,Evelyn,Rice,Female)
-</source>
-</section>
-
+   
      <!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
  <section id="count-star">
    <title>COUNT_STAR</title>
@@ -1588,80 +1377,7 @@ DUMP X;
          </tr> 
    </table>
    </section></section>
-  
-
-<!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
- <section id="in">
- <title>IN</title>
- <p>IN operator allows you to easily test if an expression matches any value 
in a list of values. It is used to reduce the need for multiple OR 
conditions.</p>
-
- <section>
- <title>Syntax</title>
- <table>
-     <tr>
-          <td>
-             <p>IN (expression)</p>
-          </td>
-       </tr>
- </table></section>
-
- <section>
- <title>Terms</title>
- <table>
-     <tr>
-          <td>
-             <p>expression</p>
-          </td>
-          <td>
-             <p>An expression with data types chararray, int, long, float, 
double, bigdecimal, biginteger or bytearray.</p>
-          </td>
-       </tr>
- </table></section>
-
- <section>
- <title>Usage</title>
- <p>IN operator allows you to easily test if an expression matches any value 
in a list of values. It is used to help reduce the need for multiple OR 
conditions.</p>
- </section>
-
- <section>
- <title>Example</title>
- <p>In this example we filter out ID 4 and 6.</p>
-<source>
-A = load 'data' using PigStorage(',') AS (id:int, first:chararray, 
last:chararray, gender:chararray);
-
-DUMP A;
-(1,Christine,Romero,Female)
-(2,Sara,Hansen,Female)
-(3,Albert,Rogers,Male)
-(4,Kimberly,Morrison,Female)
-(5,Eugene,Baker,Male)
-(6,Ann,Alexander,Female)
-(7,Kathleen,Reed,Female)
-(8,Todd,Scott,Male)
-(9,Sharon,Mccoy,Female)
-(10,Evelyn,Rice,Female)
-
-X = FILTER A BY id IN (4, 6);
-DUMP X;
-(4,Kimberly,Morrison,Female)
-(6,Ann,Alexander,Female)
-</source>
- </section>
-
-<p>In this example, we're passing a BigInteger and using NOT operator, thereby 
negating the passed list of fields in the IN clause</p>
-<source>
-A = load 'data' using PigStorage(',') AS (id:biginteger, first:chararray, 
last:chararray, gender:chararray); 
-X = FILTER A BY NOT id IN (1, 3, 5, 7, 9); 
-DUMP X;
- 
-(2,Sara,Hansen,Female)
-(4,Kimberly,Morrison,Female)
-(6,Ann,Alexander,Female)
-(8,Todd,Scott,Male)
-(10,Evelyn,Rice,Female)
-</source>
-</section>
-
+   
      <!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
    <section id="tokenize">
    <title>TOKENIZE</title>
@@ -1726,7 +1442,7 @@ DUMP X;
 <source>
 {code}
 A = LOAD 'data' AS (f1:chararray);
-B = FOREACH A GENERATE TOKENIZE (f1,'||');
+B = FOREACH A TOKENIZE (f1,'||');
 DUMP B;
 {code} 
 </source>

Modified: pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java Fri Feb 
24 03:34:37 2017
@@ -34,10 +34,10 @@ public class CounterBasedErrorHandler im
 
     public CounterBasedErrorHandler() {
         Configuration conf = UDFContext.getUDFContext().getJobConf();
-        this.minErrors = 
conf.getLong(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS,
+        this.minErrors = conf.getLong(PigConfiguration.PIG_ERRORS_MIN_RECORDS,
                 0);
         this.errorThreshold = conf.getFloat(
-                PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT, 0.0f);
+                PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT, 0.0f);
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/EvalFunc.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/EvalFunc.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/EvalFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/EvalFunc.java Fri Feb 24 03:34:37 2017
@@ -369,17 +369,4 @@ public abstract class EvalFunc<T>  {
 
     public void setEndOfAllInput(boolean endOfAllInput) {
     }
-
-    /**
-     * This will be called on both the front end and the back
-     * end during execution.
-     * @return the {@link LoadCaster} associated with this eval. Returning null
-     * indicates that casts from bytearray will pick the one associated with 
the
-     * parameters when they all come from the same loadcaster type.
-     * @throws IOException if there is an exception during LoadCaster
-     */
-    public LoadCaster getLoadCaster() throws IOException {
-        return null;
-    }
-
 }

Modified: pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java (original)
+++ pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java Fri Feb 24 03:34:37 
2017
@@ -22,7 +22,6 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -48,7 +47,6 @@ public class JVMReuseImpl {
         PigGenericMapReduce.staticDataCleanup();
         PigStatusReporter.staticDataCleanup();
         PigCombiner.Combine.staticDataCleanup();
-        DistinctCombiner.Combine.staticDataCleanup();
 
         String className = null;
         String msg = null;

Modified: pig/branches/spark/src/org/apache/pig/LoadFunc.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/LoadFunc.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/LoadFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/LoadFunc.java Fri Feb 24 03:34:37 2017
@@ -108,7 +108,7 @@ public abstract class LoadFunc {
     public abstract InputFormat getInputFormat() throws IOException;
 
     /**
-     * This will be called on both the front end and the back
+     * This will be called on the front end during planning and not on the 
back 
      * end during execution.
      * @return the {@link LoadCaster} associated with this loader. Returning 
null 
      * indicates that casts from byte array are not supported for this loader. 

Modified: pig/branches/spark/src/org/apache/pig/Main.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/Main.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/Main.java (original)
+++ pig/branches/spark/src/org/apache/pig/Main.java Fri Feb 24 03:34:37 2017
@@ -27,6 +27,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.io.Reader;
 import java.io.StringReader;
 import java.net.URL;
@@ -44,8 +45,9 @@ import java.util.jar.Attributes;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
 
-import jline.console.ConsoleReader;
-import jline.console.history.FileHistory;
+import jline.ConsoleReader;
+import jline.ConsoleReaderInputStream;
+import jline.History;
 
 import org.antlr.runtime.RecognitionException;
 import org.apache.commons.logging.Log;
@@ -57,7 +59,6 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigRunner.ReturnCode;
-import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.classification.InterfaceAudience;
@@ -75,7 +76,6 @@ import org.apache.pig.parser.DryRunGrunt
 import org.apache.pig.scripting.ScriptEngine;
 import org.apache.pig.scripting.ScriptEngine.SupportedScriptLang;
 import org.apache.pig.tools.cmdline.CmdLineParser;
-import org.apache.pig.tools.grunt.ConsoleReaderInputStream;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStats;
@@ -100,12 +100,13 @@ import com.google.common.io.Closeables;
 public class Main {
 
     static {
-        Utils.addShutdownHookWithPriority(new Runnable() {
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+
             @Override
             public void run() {
                 FileLocalizer.deleteTempResourceFiles();
             }
-        }, PigImplConstants.SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY);
+        });
     }
 
     private final static Log log = LogFactory.getLog(Main.class);
@@ -476,7 +477,7 @@ public class Main {
                 }
 
 
-                logFileName = validateLogFile(logFileName, localFileRet.file);
+                logFileName = validateLogFile(logFileName, file);
                 pigContext.getProperties().setProperty("pig.logfile", 
(logFileName == null? "": logFileName));
 
                 // Set job name based on name of the script
@@ -487,7 +488,7 @@ public class Main {
                     new File(substFile).deleteOnExit();
                 }
 
-                scriptState.setScript(localFileRet.file);
+                scriptState.setScript(new File(file));
 
                 grunt = new Grunt(pin, pigContext);
                 gruntCalled = true;
@@ -550,13 +551,12 @@ public class Main {
                 }
                 // Interactive
                 mode = ExecMode.SHELL;
-                //Reader is created by first loading 
"pig.load.default.statements" or .pigbootup file if available
-                ConsoleReader reader = new 
ConsoleReader(Utils.getCompositeStream(System.in, properties), System.out);
-                reader.setExpandEvents(false);
-                reader.setPrompt("grunt> ");
+              //Reader is created by first loading 
"pig.load.default.statements" or .pigbootup file if available
+                ConsoleReader reader = new 
ConsoleReader(Utils.getCompositeStream(System.in, properties), new 
OutputStreamWriter(System.out));
+                reader.setDefaultPrompt("grunt> ");
                 final String HISTORYFILE = ".pig_history";
                 String historyFile = System.getProperty("user.home") + 
File.separator  + HISTORYFILE;
-                reader.setHistory(new FileHistory(new File(historyFile)));
+                reader.setHistory(new History(new File(historyFile)));
                 ConsoleReaderInputStream inputStream = new 
ConsoleReaderInputStream(reader);
                 grunt = new Grunt(new BufferedReader(new 
InputStreamReader(inputStream)), pigContext);
                 grunt.setConsoleReader(reader);
@@ -605,7 +605,7 @@ public class Main {
                     return ReturnCode.SUCCESS;
                 }
 
-                logFileName = validateLogFile(logFileName, localFileRet.file);
+                logFileName = validateLogFile(logFileName, remainders[0]);
                 pigContext.getProperties().setProperty("pig.logfile", 
(logFileName == null? "": logFileName));
 
                 if (!debug) {
@@ -660,7 +660,6 @@ public class Main {
             if(!gruntCalled) {
                 LogUtils.writeLog(e, logFileName, log, verbose, "Error before 
Pig is launched");
             }
-            killRunningJobsIfInterrupted(e, pigContext);
         } catch (Throwable e) {
             rc = ReturnCode.THROWABLE_EXCEPTION;
             PigStatsUtil.setErrorMessage(e.getMessage());
@@ -669,7 +668,6 @@ public class Main {
             if(!gruntCalled) {
                 LogUtils.writeLog(e, logFileName, log, verbose, "Error before 
Pig is launched");
             }
-            killRunningJobsIfInterrupted(e, pigContext);
         } finally {
             if (printScriptRunTime) {
                 printScriptRunTime(startTime);
@@ -696,22 +694,6 @@ public class Main {
                 + " (" + duration.getMillis() + " ms)");
     }
 
-    private static void killRunningJobsIfInterrupted(Throwable e, PigContext 
pigContext) {
-        Throwable cause = e.getCause();
-        // Kill running job when we get InterruptedException
-        // Pig thread is interrupted by mapreduce when Oozie launcher job is 
killed
-        // Shutdown hook kills running jobs, but sometimes NodeManager can 
issue
-        // a SIGKILL after AM unregisters and before shutdown hook gets to 
execute
-        // causing orphaned jobs that continue to run.
-        if (e instanceof InterruptedException || (cause != null && cause 
instanceof InterruptedException)) {
-            try {
-                pigContext.getExecutionEngine().kill();
-            } catch (BackendException be) {
-                log.error("Error while killing running jobs", be);
-            }
-        }
-    }
-
     protected static PigProgressNotificationListener makeListener(Properties 
properties) {
 
         try {
@@ -989,10 +971,11 @@ public class Main {
             System.out.println("Additionally, any Hadoop property can be 
specified.");
     }
 
-    private static String validateLogFile(String logFileName, File scriptFile) 
{
+    private static String validateLogFile(String logFileName, String 
scriptName) {
         String strippedDownScriptName = null;
 
-        if (scriptFile != null) {
+        if(scriptName != null) {
+            File scriptFile = new File(scriptName);
             if(!scriptFile.isDirectory()) {
                 String scriptFileAbsPath;
                 try {

Modified: pig/branches/spark/src/org/apache/pig/PigConfiguration.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigConfiguration.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigConfiguration.java Fri Feb 24 
03:34:37 2017
@@ -18,7 +18,6 @@
 
 package org.apache.pig;
 
-
 /**
  * Container for static configuration strings, defaults, etc. This is intended 
just for keys that can
  * be set by users, not for keys that are generally used within pig.
@@ -63,15 +62,9 @@ public class PigConfiguration {
     public static final String PIG_TEZ_OPT_UNION = "pig.tez.opt.union";
     /**
      * These keys are used to enable or disable tez union optimization for
-     * specific StoreFuncs. Optimization should be turned off for those
-     * StoreFuncs that hard code part file names and do not prefix file names
-     * with mapreduce.output.basename configuration.
-     *
-     * If the StoreFuncs implement
-     * {@link StoreFunc#supportsParallelWriteToStoreLocation()} and return true
-     * or false then that is is used to turn on or off union optimization
-     * respectively. These settings can be used for StoreFuncs that have not
-     * implemented the API yet.
+     * specific StoreFuncs so that optimization is only applied to StoreFuncs
+     * that do not hard part file names and honor mapreduce.output.basename and
+     * is turned of for those that do not. Refer PIG-4649
      */
     public static final String PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS = 
"pig.tez.opt.union.supported.storefuncs";
     public static final String PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS = 
"pig.tez.opt.union.unsupported.storefuncs";
@@ -134,58 +127,6 @@ public class PigConfiguration {
     public static final String PIG_SKEWEDJOIN_REDUCE_MEM = 
"pig.skewedjoin.reduce.mem";
 
     /**
-     * Bloom join has two different kind of implementations.
-     * <ul>
-     * <li>map <br>
-     * In each map, bloom filters are computed on the join keys partitioned by
-     * the hashcode of the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number 
of
-     * partitions. Bloom filters from different maps are then combined in the
-     * reducer producing one bloom filter per partition. This is efficient and
-     * fast if there are smaller number of maps (<10) and the number of
-     * distinct keys are not too high. It can be faster with larger number of
-     * maps and even with bigger bloom vector sizes, but the amount of data
-     * shuffled to the reducer for aggregation becomes huge making it
-     * inefficient.</li>
-     * <li>reduce <br>
-     * Join keys are sent from the map to the reducer partitioned by hashcode 
of
-     * the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of reducers. One
-     * bloom filter is then created per partition. This is efficient for larger
-     * datasets with lot of maps or very large
-     * {@link #PIG_BLOOMJOIN_VECTORSIZE_BYTES}. In this case size of keys sent
-     * to the reducer is smaller than sending bloom filters to reducer for
-     * aggregation making it efficient.</li>
-     * </ul>
-     * Default value is map.
-     */
-    public static final String PIG_BLOOMJOIN_STRATEGY = 
"pig.bloomjoin.strategy";
-
-    /**
-     * The number of bloom filters that will be created.
-     * Default is 1 for map strategy and 11 for reduce strategy.
-     */
-    public static final String PIG_BLOOMJOIN_NUM_FILTERS = 
"pig.bloomjoin.num.filters";
-
-    /**
-     * The size in bytes of the bit vector to be used for the bloom filter.
-     * A bigger vector size will be needed when the number of distinct keys is 
higher.
-     * Default value is 1048576 (1MB).
-     */
-    public static final String PIG_BLOOMJOIN_VECTORSIZE_BYTES = 
"pig.bloomjoin.vectorsize.bytes";
-
-    /**
-     * The type of hash function to use. Valid values are jenkins and murmur.
-     * Default is murmur.
-     */
-    public static final String PIG_BLOOMJOIN_HASH_TYPE = 
"pig.bloomjoin.hash.type";
-
-    /**
-     * The number of hash functions to be used in bloom computation. It 
determines the probability of false positives.
-     * Higher the number lower the false positives. Too high a value can 
increase the cpu time.
-     * Default value is 3.
-     */
-    public static final String PIG_BLOOMJOIN_HASH_FUNCTIONS = 
"pig.bloomjoin.hash.functions";
-
-    /**
      * This key used to control the maximum size loaded into
      * the distributed cache when doing fragment-replicated join
      */
@@ -210,12 +151,6 @@ public class PigConfiguration {
      * This key is used to configure grace parallelism in tez. Default is true.
      */
     public static final String PIG_TEZ_GRACE_PARALLELISM = 
"pig.tez.grace.parallelism";
-    /**
-     * This key is used to turn off dag recovery if there is auto parallelism.
-     * Default is false. Useful when running with Tez versions before Tez 0.8
-     * which have issues with auto parallelism during DAG recovery.
-     */
-    public static final String PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY = 
"pig.tez.auto.parallelism.disable.dag.recovery";
 
     /**
      * This key is used to configure compression for the pig input splits which
@@ -389,17 +324,17 @@ public class PigConfiguration {
     /**
      * Boolean value used to enable or disable error handling for storers
      */
-    public static final String PIG_ERROR_HANDLING_ENABLED = 
"pig.error-handling.enabled";
+    public static final String PIG_ALLOW_STORE_ERRORS = 
"pig.allow.store.errors";
 
     /**
      * Controls the minimum number of errors
      */
-    public static final String PIG_ERROR_HANDLING_MIN_ERROR_RECORDS = 
"pig.error-handling.min.error.records";
+    public static final String PIG_ERRORS_MIN_RECORDS = 
"pig.errors.min.records";
 
     /**
      * Set the threshold for percentage of errors
      */
-    public static final String PIG_ERROR_HANDLING_THRESHOLD_PERCENT = 
"pig.error-handling.error.threshold";
+    public static final String PIG_ERROR_THRESHOLD_PERCENT = 
"pig.error.threshold.percent";
 
     /**
      * Comma-delimited entries of commands/operators that must be disallowed.
@@ -476,31 +411,6 @@ public class PigConfiguration {
      */
     public static final String PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE = 
"pig.spill.unused.memory.threshold.size";
 
-    /**
-     * Log tracing id that can be used by upstream clients for tracking 
respective logs
-     */
-    public static final String PIG_LOG_TRACE_ID = "pig.log.trace.id";
-
-    /**
-     * @deprecated use {@link #PIG_LOG_TRACE_ID} instead. Will be removed in 
Pig 0.18
-     */
-    public static final String CALLER_ID = PIG_LOG_TRACE_ID;
-
-    /**
-     * Enable ATS for Pig
-     */
-    public static final String PIG_ATS_ENABLED = "pig.ats.enabled";
-
-    /**
-     * @deprecated use {@link #PIG_ATS_ENABLED} instead. Will be removed in 
Pig 0.18
-     */
-    public static final String ENABLE_ATS = PIG_ATS_ENABLED;
-
-    /**
-     * If set, Pig will override tez.am.launch.cmd-opts and 
tez.am.resource.memory.mb to optimal
-     */
-    public static final String PIG_TEZ_CONFIGURE_AM_MEMORY = 
"pig.tez.configure.am.memory";
-
     // Deprecated settings of Pig 0.13
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigServer.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigServer.java Fri Feb 24 03:34:37 
2017
@@ -25,8 +25,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.io.StringReader;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -55,7 +53,6 @@ import org.apache.pig.backend.datastorag
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
-import org.apache.pig.backend.hadoop.PigATSClient;
 import org.apache.pig.backend.hadoop.executionengine.HJob;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.classification.InterfaceAudience;
@@ -244,54 +241,6 @@ public class PigServer {
         }
         PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
 
-        // log ATS event includes the caller context
-        String auditId = PigATSClient.getPigAuditId(pigContext);
-        String callerId = 
(String)pigContext.getProperties().get(PigConfiguration.PIG_LOG_TRACE_ID);
-        log.info("Pig Script ID for the session: " + auditId);
-        if (callerId != null) {
-            log.info("Caller ID for session: " + callerId);
-        }
-        if (Boolean.parseBoolean(pigContext.getProperties()
-                .getProperty(PigConfiguration.PIG_ATS_ENABLED))) {
-            if (Boolean.parseBoolean(pigContext.getProperties()
-                    .getProperty("yarn.timeline-service.enabled", "false"))) {
-                PigATSClient.ATSEvent event = new 
PigATSClient.ATSEvent(auditId, callerId);
-                try {
-                    PigATSClient.getInstance().logEvent(event);
-                } catch (Exception e) {
-                    log.warn("Error posting to ATS: ", e);
-                }
-            } else {
-                log.warn("ATS is disabled since"
-                        + " yarn.timeline-service.enabled set to false");
-            }
-
-        }
-
-        // set hdfs caller context
-        Class callerContextClass = null;
-        try {
-            callerContextClass = 
Class.forName("org.apache.hadoop.ipc.CallerContext");
-        } catch (ClassNotFoundException e) {
-            // If pre-Hadoop 2.8.0, skip setting CallerContext
-        }
-        if (callerContextClass != null) {
-            try {
-                // Reflection for the following code since it is only 
available since hadoop 2.8.0:
-                // CallerContext hdfsContext = new 
CallerContext.Builder(auditId).build();
-                // CallerContext.setCurrent(hdfsContext);
-                Class callerContextBuilderClass = 
Class.forName("org.apache.hadoop.ipc.CallerContext$Builder");
-                Constructor callerContextBuilderConstruct = 
callerContextBuilderClass.getConstructor(String.class);
-                Object builder = 
callerContextBuilderConstruct.newInstance(auditId);
-                Method builderBuildMethod = 
builder.getClass().getMethod("build");
-                Object hdfsContext = builderBuildMethod.invoke(builder);
-                Method callerContextSetCurrentMethod = 
callerContextClass.getMethod("setCurrent", hdfsContext.getClass());
-                callerContextSetCurrentMethod.invoke(callerContextClass, 
hdfsContext);
-            } catch (Exception e) {
-                // Shall not happen unless API change in future Hadoop commons
-                throw new ExecException(e);
-            }
-        }
     }
 
     private void addHadoopProperties() throws ExecException {
@@ -663,8 +612,7 @@ public class PigServer {
             pigContext.scriptingUDFs.put(path, namespace);
         }
 
-        FetchFileRet ret = FileLocalizer.fetchFile(pigContext.getProperties(), 
path);
-        File f = ret.file;
+        File f = FileLocalizer.fetchFile(pigContext.getProperties(), 
path).file;
         if (!f.canRead()) {
             int errCode = 4002;
             String msg = "Can't read file: " + path;
@@ -673,19 +621,9 @@ public class PigServer {
         }
         String cwd = new File(".").getCanonicalPath();
         String filePath = f.getCanonicalPath();
-        String nameInJar = filePath;
-        // Use the relative path in the jar, if the path specified is relative
-        if (!ret.didFetch) {
-            if (!new File(path).isAbsolute() && path.indexOf("." + 
File.separator) == -1) {
-                // In case of Oozie, the localized files are in a different
-                // directory symlinked to the current directory. Canonical 
path will not point to cwd.
-                nameInJar = path;
-            } else if (filePath.equals(cwd + File.separator + path)) {
-                // If user specified absolute path and it refers to cwd
-                nameInJar = filePath.substring(cwd.length() + 1);
-            }
-        }
-
+        //Use the relative path in the jar, if the path specified is relative
+        String nameInJar = filePath.equals(cwd + File.separator + path) ?
+                filePath.substring(cwd.length() + 1) : filePath;
         pigContext.addScriptFile(nameInJar, filePath);
         if(scriptingLang != null) {
             ScriptEngine se = ScriptEngine.getInstance(scriptingLang);

Modified: pig/branches/spark/src/org/apache/pig/StoreFunc.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/StoreFunc.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/StoreFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/StoreFunc.java Fri Feb 24 03:34:37 
2017
@@ -21,14 +21,17 @@ import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
+
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 
 /**
@@ -42,21 +45,21 @@ public abstract class StoreFunc implemen
     /**
      * This method is called by the Pig runtime in the front end to convert the
      * output location to an absolute path if the location is relative. The
-     * StoreFunc implementation is free to choose how it converts a relative
+     * StoreFunc implementation is free to choose how it converts a relative 
      * location to an absolute location since this may depend on what the 
location
-     * string represent (hdfs path or some other data source).
-     *
-     *
+     * string represent (hdfs path or some other data source). 
+     *  
+     * 
      * @param location location as provided in the "store" statement of the 
script
      * @param curDir the current working direction based on any "cd" statements
      * in the script before the "store" statement. If there are no "cd" 
statements
-     * in the script, this would be the home directory -
+     * in the script, this would be the home directory - 
      * <pre>/user/<username> </pre>
      * @return the absolute location based on the arguments passed
      * @throws IOException if the conversion is not possible
      */
     @Override
-    public String relToAbsPathForStoreLocation(String location, Path curDir)
+    public String relToAbsPathForStoreLocation(String location, Path curDir) 
     throws IOException {
         return LoadFunc.getAbsolutePath(location, curDir);
     }
@@ -64,34 +67,32 @@ public abstract class StoreFunc implemen
     /**
      * Return the OutputFormat associated with StoreFunc.  This will be called
      * on the front end during planning and on the backend during
-     * execution.
+     * execution. 
      * @return the {@link OutputFormat} associated with StoreFunc
-     * @throws IOException if an exception occurs while constructing the
+     * @throws IOException if an exception occurs while constructing the 
      * OutputFormat
      *
      */
-    @Override
     public abstract OutputFormat getOutputFormat() throws IOException;
 
     /**
-     * Communicate to the storer the location where the data needs to be 
stored.
-     * The location string passed to the {@link StoreFunc} here is the
-     * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, 
Path)}
+     * Communicate to the storer the location where the data needs to be 
stored.  
+     * The location string passed to the {@link StoreFunc} here is the 
+     * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, 
Path)} 
      * This method will be called in the frontend and backend multiple times. 
Implementations
      * should bear in mind that this method is called multiple times and should
      * ensure there are no inconsistent side effects due to the multiple calls.
      * {@link #checkSchema(ResourceSchema)} will be called before any call to
      * {@link #setStoreLocation(String, Job)}.
-     *
+     * 
 
-     * @param location Location returned by
+     * @param location Location returned by 
      * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
      * @param job The {@link Job} object
      * @throws IOException if the location is not valid.
      */
-    @Override
     public abstract void setStoreLocation(String location, Job job) throws 
IOException;
-
+ 
     /**
      * Set the schema for data to be stored.  This will be called on the
      * front end during planning if the store is associated with a schema.
@@ -116,23 +117,21 @@ public abstract class StoreFunc implemen
      * @param writer RecordWriter to use.
      * @throws IOException if an exception occurs during initialization
      */
-    @Override
     public abstract void prepareToWrite(RecordWriter writer) throws 
IOException;
 
     /**
      * Write a tuple to the data store.
-     *
+     * 
      * @param t the tuple to store.
      * @throws IOException if an exception occurs during the write
      */
-    @Override
     public abstract void putNext(Tuple t) throws IOException;
-
+    
     /**
      * This method will be called by Pig both in the front end and back end to
      * pass a unique signature to the {@link StoreFunc} which it can use to 
store
      * information in the {@link UDFContext} which it needs to store between
-     * various method invocations in the front end and back end. This method
+     * various method invocations in the front end and back end. This method 
      * will be called before other methods in {@link StoreFunc}.  This is 
necessary
      * because in a Pig Latin script with multiple stores, the different
      * instances of store functions need to be able to find their (and only 
their)
@@ -143,21 +142,21 @@ public abstract class StoreFunc implemen
     public void setStoreFuncUDFContextSignature(String signature) {
         // default implementation is a no-op
     }
-
+    
     /**
      * This method will be called by Pig if the job which contains this store
      * fails. Implementations can clean up output locations in this method to
      * ensure that no incorrect/incomplete results are left in the output 
location.
      * The default implementation  deletes the output location if it
      * is a {@link FileSystem} location.
-     * @param location Location returned by
+     * @param location Location returned by 
      * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
-     * @param job The {@link Job} object - this should be used only to obtain
+     * @param job The {@link Job} object - this should be used only to obtain 
      * cluster properties through {@link Job#getConfiguration()} and not to 
set/query
-     * any runtime job information.
+     * any runtime job information. 
      */
     @Override
-    public void cleanupOnFailure(String location, Job job)
+    public void cleanupOnFailure(String location, Job job) 
     throws IOException {
         cleanupOnFailureImpl(location, job);
     }
@@ -167,19 +166,19 @@ public abstract class StoreFunc implemen
      * is successful, and some cleanup of intermediate resources is required.
      * Implementations can clean up output locations in this method to
      * ensure that no incorrect/incomplete results are left in the output 
location.
-     * @param location Location returned by
+     * @param location Location returned by 
      * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
-     * @param job The {@link Job} object - this should be used only to obtain
+     * @param job The {@link Job} object - this should be used only to obtain 
      * cluster properties through {@link Job#getConfiguration()} and not to 
set/query
-     * any runtime job information.
+     * any runtime job information. 
      */
     @Override
-    public void cleanupOnSuccess(String location, Job job)
+    public void cleanupOnSuccess(String location, Job job) 
     throws IOException {
         // DEFAULT: DO NOTHING, user-defined overrides can
         // call cleanupOnFailureImpl(location, job) or ...?
     }
-
+    
     /**
      * Default implementation for {@link #cleanupOnFailure(String, Job)}
      * and {@link #cleanupOnSuccess(String, Job)}.  This removes a file
@@ -188,56 +187,15 @@ public abstract class StoreFunc implemen
      * @param job Hadoop job, used to access the appropriate file system.
      * @throws IOException
      */
-    public static void cleanupOnFailureImpl(String location, Job job)
-    throws IOException {
+    public static void cleanupOnFailureImpl(String location, Job job) 
+    throws IOException {        
         Path path = new Path(location);
         FileSystem fs = path.getFileSystem(job.getConfiguration());
         if(fs.exists(path)){
             fs.delete(path, true);
-        }
-    }
-
-    // TODO When dropping support for JDK 7 move this as a default method to 
StoreFuncInterface
-    /**
-     * DAG execution engines like Tez support optimizing union by writing to
-     * output location in parallel from tasks of different vertices. Commit is
-     * called once all the vertices in the union are complete. This eliminates
-     * need to have a separate phase to read data output from previous phases,
-     * union them and write out again.
-     *
-     * Enabling the union optimization requires the OutputFormat to
-     *
-     *      1) Support creation of different part file names for tasks of 
different
-     * vertices. Conflicting filenames can create data corruption and loss.
-     * For eg: If task 0 of vertex1 and vertex2 both create filename as
-     * part-r-00000, then one of the files will be overwritten when promoting
-     * from temporary to final location leading to data loss.
-     *      FileOutputFormat has mapreduce.output.basename config which enables
-     *  naming files differently in different vertices. Classes extending
-     *  FileOutputFormat and those prefixing file names with 
mapreduce.output.basename
-     *  value will not encounter conflict. Cases like HBaseStorage which write 
to key
-     *  value store and do not produce files also should not face any conflict.
-     *
-     *      2) Support calling of commit once at the end takes care of 
promoting
-     * temporary files of the different vertices into the final location.
-     * For eg: FileOutputFormat commit algorithm handles promoting of files 
produced
-     * by tasks of different vertices into final output location without issues
-     * if there is no file name conflict. In cases like HBaseStorage, the
-     * TableOutputCommitter does nothing on commit.
-     *
-     * If custom OutputFormat used by the StoreFunc does not support the above
-     * two criteria, then false should be returned. Union optimization will be
-     * disabled for the StoreFunc.
-     *
-     * Default implementation returns null and in that case planner falls back
-     * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and
-     * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS}
-     * settings to determine if the StoreFunc supports it.
-     */
-    public Boolean supportsParallelWriteToStoreLocation() {
-        return null;
+        }    
     }
-
+    
     /**
      * Issue a warning.  Warning messages are aggregated and reported to
      * the user.

Modified: pig/branches/spark/src/org/apache/pig/StreamToPig.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/StreamToPig.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/StreamToPig.java (original)
+++ pig/branches/spark/src/org/apache/pig/StreamToPig.java Fri Feb 24 03:34:37 
2017
@@ -57,7 +57,7 @@ public interface StreamToPig {
     public Tuple deserialize(byte[] bytes) throws IOException;
 
     /**
-     * This will be called on both the front end and the back
+     * This will be called on the front end during planning and not on the back
      * end during execution.
      *
      * @return the {@link LoadCaster} associated with this object, or null if

Modified: 
pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
 Fri Feb 24 03:34:37 2017
@@ -183,14 +183,6 @@ public interface ExecutionEngine {
     public ExecutableManager getExecutableManager();
 
     /**
-     * This method is called when user requests to kill all jobs
-     * associated with the execution engine
-     *
-     * @throws BackendException
-     */
-    public void kill() throws BackendException;
-
-    /**
      * This method is called when a user requests to kill a job associated with
      * the given job id. If it is not possible for a user to kill a job, throw 
a
      * exception. It is imperative for the job id's being displayed to be 
unique

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
 Fri Feb 24 03:34:37 2017
@@ -17,6 +17,8 @@
 package org.apache.pig.backend.hadoop.accumulo;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Collection;
@@ -301,8 +303,24 @@ public abstract class AbstractAccumuloSt
      */
     protected void simpleUnset(Configuration conf,
             Map<String, String> entriesToUnset) {
-        for (String key : entriesToUnset.keySet()) {
-            conf.unset(key);
+        try {
+            Method unset = conf.getClass().getMethod("unset", String.class);
+
+            for (String key : entriesToUnset.keySet()) {
+                unset.invoke(conf, key);
+            }
+        } catch (NoSuchMethodException e) {
+            log.error("Could not invoke Configuration.unset method", e);
+            throw new RuntimeException(e);
+        } catch (IllegalAccessException e) {
+            log.error("Could not invoke Configuration.unset method", e);
+            throw new RuntimeException(e);
+        } catch (IllegalArgumentException e) {
+            log.error("Could not invoke Configuration.unset method", e);
+            throw new RuntimeException(e);
+        } catch (InvocationTargetException e) {
+            log.error("Could not invoke Configuration.unset method", e);
+            throw new RuntimeException(e);
         }
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java 
Fri Feb 24 03:34:37 2017
@@ -22,6 +22,8 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.text.MessageFormat;
@@ -40,7 +42,6 @@ import java.util.zip.ZipOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Logger;
 
@@ -111,7 +112,7 @@ public class Utils {
         // attempt to locate an existing jar for the class.
         String jar = findContainingJar(my_class, packagedClasses);
         if (null == jar || jar.isEmpty()) {
-            jar = JarFinder.getJar(my_class);
+            jar = getJar(my_class);
             updateMap(jar, packagedClasses);
         }
 
@@ -199,6 +200,41 @@ public class Utils {
     }
 
     /**
+     * Invoke 'getJar' on a JarFinder implementation. Useful for some job
+     * configuration contexts (HBASE-8140) and also for testing on MRv2. First
+     * check if we have HADOOP-9426. Lacking that, fall back to the backport.
+     * 
+     * @param my_class
+     *            the class to find.
+     * @return a jar file that contains the class, or null.
+     */
+    private static String getJar(Class<?> my_class) {
+        String ret = null;
+        String hadoopJarFinder = "org.apache.hadoop.util.JarFinder";
+        Class<?> jarFinder = null;
+        try {
+            log.debug("Looking for " + hadoopJarFinder + ".");
+            jarFinder = Class.forName(hadoopJarFinder);
+            log.debug(hadoopJarFinder + " found.");
+            Method getJar = jarFinder.getMethod("getJar", Class.class);
+            ret = (String) getJar.invoke(null, my_class);
+        } catch (ClassNotFoundException e) {
+            log.debug("Using backported JarFinder.");
+            ret = jarFinderGetJar(my_class);
+        } catch (InvocationTargetException e) {
+            // function was properly called, but threw it's own exception.
+            // Unwrap it
+            // and pass it on.
+            throw new RuntimeException(e.getCause());
+        } catch (Exception e) {
+            // toss all other exceptions, related to reflection failure
+            throw new RuntimeException("getJar invocation failed.", e);
+        }
+
+        return ret;
+    }
+
+    /**
      * Returns the full path to the Jar containing the class. It always return 
a
      * JAR.
      * 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
 Fri Feb 24 03:34:37 2017
@@ -29,6 +29,7 @@ import org.apache.pig.ExecType;
 import org.apache.pig.PigConstants;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 
 public class ConfigurationUtil {
@@ -88,7 +89,7 @@ public class ConfigurationUtil {
             // so build/classes/hadoop-site.xml contains such entry. This 
prevents some tests from
             // successful (They expect those files in hdfs), so we need to 
unset it in hadoop 23.
             // This should go away once MiniMRCluster fix the distributed 
cache issue.
-            localConf.unset(MRConfiguration.JOB_CACHE_FILES);
+            HadoopShims.unsetConf(localConf, MRConfiguration.JOB_CACHE_FILES);
         }
         localConf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
         Properties props = ConfigurationUtil.toProperties(localConf);
@@ -105,14 +106,4 @@ public class ConfigurationUtil {
             }
         }
     }
-
-    /**
-     * Returns Properties containing alternative names of given property and 
same values - can be used to solve deprecations
-     * @return
-     */
-    public static Properties expandForAlternativeNames(String name, String 
value){
-        final Configuration config = new Configuration(false);
-        config.set(name,value);
-        return ConfigurationUtil.toProperties(config);
-    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
 Fri Feb 24 03:34:37 2017
@@ -18,20 +18,20 @@
 
 package org.apache.pig.backend.hadoop.datastorage;
 
-import java.io.IOException;
 import java.net.URI;
+import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.HashMap;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
@@ -40,6 +40,8 @@ import org.apache.pig.backend.datastorag
 
 public class HDataStorage implements DataStorage {
 
+    private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
+
     private FileSystem fs;
     private Configuration configuration;
     private Properties properties;
@@ -56,10 +58,9 @@ public class HDataStorage implements Dat
         init();
     }
 
-    @Override
     public void init() {
         // check if name node is set, if not we set local as fail back
-        String nameNode = 
this.properties.getProperty(FileSystem.FS_DEFAULT_NAME_KEY);
+        String nameNode = this.properties.getProperty(FILE_SYSTEM_LOCATION);
         if (nameNode == null || nameNode.length() == 0) {
             nameNode = "local";
         }
@@ -75,17 +76,14 @@ public class HDataStorage implements Dat
         }
     }
 
-    @Override
     public void close() throws IOException {
         fs.close();
     }
-
-    @Override
+    
     public Properties getConfiguration() {
         return this.properties;
     }
 
-    @Override
     public void updateConfiguration(Properties newConfiguration)
             throws DataStorageException {
         // TODO sgroschupf 25Feb2008 this method is never called and
@@ -94,40 +92,38 @@ public class HDataStorage implements Dat
         if (newConfiguration == null) {
             return;
         }
-
+        
         Enumeration<Object> newKeys = newConfiguration.keys();
-
+        
         while (newKeys.hasMoreElements()) {
             String key = (String) newKeys.nextElement();
             String value = null;
-
+            
             value = newConfiguration.getProperty(key);
-
+            
             fs.getConf().set(key,value);
         }
     }
-
-    @Override
+    
     public Map<String, Object> getStatistics() throws IOException {
         Map<String, Object> stats = new HashMap<String, Object>();
 
         long usedBytes = fs.getUsed();
         stats.put(USED_BYTES_KEY , Long.valueOf(usedBytes).toString());
-
+        
         if (fs instanceof DistributedFileSystem) {
             DistributedFileSystem dfs = (DistributedFileSystem) fs;
-
+            
             long rawCapacityBytes = dfs.getRawCapacity();
             stats.put(RAW_CAPACITY_KEY, 
Long.valueOf(rawCapacityBytes).toString());
-
+            
             long rawUsedBytes = dfs.getRawUsed();
             stats.put(RAW_USED_KEY, Long.valueOf(rawUsedBytes).toString());
         }
-
+        
         return stats;
     }
-
-    @Override
+    
     public ElementDescriptor asElement(String name) throws 
DataStorageException {
         if (this.isContainer(name)) {
             return new HDirectory(this, name);
@@ -136,82 +132,70 @@ public class HDataStorage implements Dat
             return new HFile(this, name);
         }
     }
-
-    @Override
+    
     public ElementDescriptor asElement(ElementDescriptor element)
             throws DataStorageException {
         return asElement(element.toString());
     }
-
-    @Override
+    
     public ElementDescriptor asElement(String parent,
-                                                  String child)
+                                                  String child) 
             throws DataStorageException {
         return asElement((new Path(parent, child)).toString());
     }
 
-    @Override
     public ElementDescriptor asElement(ContainerDescriptor parent,
-                                                  String child)
+                                                  String child) 
             throws DataStorageException {
         return asElement(parent.toString(), child);
     }
 
-    @Override
     public ElementDescriptor asElement(ContainerDescriptor parent,
-                                                  ElementDescriptor child)
+                                                  ElementDescriptor child) 
             throws DataStorageException {
         return asElement(parent.toString(), child.toString());
     }
 
-    @Override
-    public ContainerDescriptor asContainer(String name)
+    public ContainerDescriptor asContainer(String name) 
             throws DataStorageException {
         return new HDirectory(this, name);
     }
-
-    @Override
+    
     public ContainerDescriptor asContainer(ContainerDescriptor container)
             throws DataStorageException {
         return new HDirectory(this, container.toString());
     }
-
-    @Override
+    
     public ContainerDescriptor asContainer(String parent,
-                                                      String child)
+                                                      String child) 
             throws DataStorageException {
         return new HDirectory(this, parent, child);
     }
 
-    @Override
     public ContainerDescriptor asContainer(ContainerDescriptor parent,
-                                                      String child)
+                                                      String child) 
             throws DataStorageException {
         return new HDirectory(this, parent.toString(), child);
     }
-
-    @Override
+    
     public ContainerDescriptor asContainer(ContainerDescriptor parent,
                                                       ContainerDescriptor 
child)
             throws DataStorageException {
         return new HDirectory(this, parent.toString(), child.toString());
     }
-
-    @Override
+    
     public void setActiveContainer(ContainerDescriptor container) {
         fs.setWorkingDirectory(new Path(container.toString()));
     }
-
-    @Override
+    
     public ContainerDescriptor getActiveContainer() {
         return new HDirectory(this, fs.getWorkingDirectory());
     }
 
-    @Override
     public boolean isContainer(String name) throws DataStorageException {
         boolean isContainer = false;
         Path path = new Path(name);
-
+        
         try {
             if ((this.fs.exists(path)) && (! this.fs.isFile(path))) {
                 isContainer = true;
@@ -222,11 +206,10 @@ public class HDataStorage implements Dat
             String msg = "Unable to check name " + name;
             throw new DataStorageException(msg, errCode, 
PigException.REMOTE_ENVIRONMENT, e);
         }
-
+        
         return isContainer;
     }
-
-    @Override
+    
     public HPath[] asCollection(String pattern) throws DataStorageException {
         try {
             FileStatus[] paths = this.fs.globStatus(new Path(pattern));
@@ -235,7 +218,7 @@ public class HDataStorage implements Dat
                 return new HPath[0];
 
             List<HPath> hpaths = new ArrayList<HPath>();
-
+            
             for (int i = 0; i < paths.length; ++i) {
                 HPath hpath = 
(HPath)this.asElement(paths[i].getPath().toString());
                 if (!hpath.systemElement()) {
@@ -250,7 +233,7 @@ public class HDataStorage implements Dat
             throw new DataStorageException(msg, errCode, 
PigException.REMOTE_ENVIRONMENT, e);
         }
     }
-
+    
     public FileSystem getHFS() {
         return fs;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 Fri Feb 24 03:34:37 2017
@@ -30,7 +30,6 @@ import java.util.Properties;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.PigException;
@@ -77,6 +76,8 @@ public abstract class HExecutionEngine i
     public static final String MAPRED_DEFAULT_SITE = "mapred-default.xml";
     public static final String YARN_DEFAULT_SITE = "yarn-default.xml";
 
+    public static final String FILE_SYSTEM_LOCATION = "fs.default.name";
+    public static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = 
"fs.defaultFS";
     public static final String LOCAL = "local";
 
     protected PigContext pigContext;
@@ -202,8 +203,8 @@ public abstract class HExecutionEngine i
                 properties.setProperty(MRConfiguration.FRAMEWORK_NAME, LOCAL);
             }
             properties.setProperty(MRConfiguration.JOB_TRACKER, LOCAL);
-            properties.remove("fs.default.name"); //Deprecated in Hadoop 2.x
-            properties.setProperty(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+            properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
+            properties.setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, 
"file:///");
 
             jc = getLocalConf();
             JobConf s3Jc = getS3Conf();
@@ -219,7 +220,24 @@ public abstract class HExecutionEngine i
         HKerberos.tryKerberosKeytabLogin(jc);
 
         cluster = jc.get(MRConfiguration.JOB_TRACKER);
-        nameNode = jc.get(FileSystem.FS_DEFAULT_NAME_KEY);
+        nameNode = jc.get(FILE_SYSTEM_LOCATION);
+        if (nameNode == null) {
+            nameNode = (String) 
pigContext.getProperties().get(ALTERNATIVE_FILE_SYSTEM_LOCATION);
+        }
+
+        if (cluster != null && cluster.length() > 0) {
+            if (!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
+                cluster = cluster + ":50020";
+            }
+            properties.setProperty(MRConfiguration.JOB_TRACKER, cluster);
+        }
+
+        if (nameNode != null && nameNode.length() > 0) {
+            if (!nameNode.contains(":") && !nameNode.equalsIgnoreCase(LOCAL)) {
+                nameNode = nameNode + ":8020";
+            }
+            properties.setProperty(FILE_SYSTEM_LOCATION, nameNode);
+        }
 
         LOG.info("Connecting to hadoop file system at: "
                 + (nameNode == null ? LOCAL : nameNode));
@@ -351,11 +369,7 @@ public abstract class HExecutionEngine i
     @Override
     public void setProperty(String property, String value) {
         Properties properties = pigContext.getProperties();
-        if (Configuration.isDeprecated(property)) {
-            
properties.putAll(ConfigurationUtil.expandForAlternativeNames(property, value));
-        } else {
-            properties.put(property, value);
-        }
+        properties.put(property, value);
     }
 
     @Override
@@ -364,13 +378,6 @@ public abstract class HExecutionEngine i
     }
 
     @Override
-    public void kill() throws BackendException {
-        if (launcher != null) {
-            launcher.kill();
-        }
-    }
-
-    @Override
     public void killJob(String jobID) throws BackendException {
         if (launcher != null) {
             launcher.killJob(jobID, getJobConf());

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java 
Fri Feb 24 03:34:37 2017
@@ -40,7 +40,7 @@ import org.apache.pig.tools.pigstats.Pig
 public class HJob implements ExecJob {
 
     private final Log log = LogFactory.getLog(getClass());
-
+    
     protected JOB_STATUS status;
     protected PigContext pigContext;
     protected FileSpec outFileSpec;
@@ -48,7 +48,7 @@ public class HJob implements ExecJob {
     protected String alias;
     protected POStore poStore;
     private PigStats stats;
-
+    
     public HJob(JOB_STATUS status,
                 PigContext pigContext,
                 POStore store,
@@ -59,7 +59,7 @@ public class HJob implements ExecJob {
         this.outFileSpec = poStore.getSFile();
         this.alias = alias;
     }
-
+    
     public HJob(JOB_STATUS status,
             PigContext pigContext,
             POStore store,
@@ -72,41 +72,37 @@ public class HJob implements ExecJob {
         this.alias = alias;
         this.stats = stats;
     }
-
-    @Override
+    
     public JOB_STATUS getStatus() {
         return status;
     }
-
-    @Override
+    
     public boolean hasCompleted() throws ExecException {
         return true;
     }
-
-    @Override
+    
     public Iterator<Tuple> getResults() throws ExecException {
         final LoadFunc p;
-
+        
         try{
-             LoadFunc originalLoadFunc =
+             LoadFunc originalLoadFunc = 
                  (LoadFunc)PigContext.instantiateFuncFromSpec(
                          outFileSpec.getFuncSpec());
-
-             p = (LoadFunc) new ReadToEndLoader(originalLoadFunc,
+             
+             p = (LoadFunc) new ReadToEndLoader(originalLoadFunc, 
                      ConfigurationUtil.toConfiguration(
-                     pigContext.getProperties()), outFileSpec.getFileName(), 
0);
+                     pigContext.getProperties()), outFileSpec.getFileName(), 
0, pigContext);
 
         }catch (Exception e){
             int errCode = 2088;
             String msg = "Unable to get results for: " + outFileSpec;
             throw new ExecException(msg, errCode, PigException.BUG, e);
         }
-
+        
         return new Iterator<Tuple>() {
             Tuple   t;
             boolean atEnd;
 
-            @Override
             public boolean hasNext() {
                 if (atEnd)
                     return false;
@@ -124,7 +120,6 @@ public class HJob implements ExecJob {
                 return !atEnd;
             }
 
-            @Override
             public Tuple next() {
                 Tuple next = t;
                 if (next != null) {
@@ -141,7 +136,6 @@ public class HJob implements ExecJob {
                 return next;
             }
 
-            @Override
             public void remove() {
                 throw new RuntimeException("Removal not supported");
             }
@@ -149,38 +143,31 @@ public class HJob implements ExecJob {
         };
     }
 
-    @Override
     public Properties getConfiguration() {
         return pigContext.getProperties();
     }
 
-    @Override
     public PigStats getStatistics() {
         //throw new UnsupportedOperationException();
         return stats;
     }
 
-    @Override
     public void completionNotification(Object cookie) {
         throw new UnsupportedOperationException();
     }
-
-    @Override
+    
     public void kill() throws ExecException {
         throw new UnsupportedOperationException();
     }
-
-    @Override
+    
     public void getLogs(OutputStream log) throws ExecException {
         throw new UnsupportedOperationException();
     }
-
-    @Override
+    
     public void getSTDOut(OutputStream out) throws ExecException {
         throw new UnsupportedOperationException();
     }
-
-    @Override
+    
     public void getSTDError(OutputStream error) throws ExecException {
         throw new UnsupportedOperationException();
     }
@@ -189,7 +176,6 @@ public class HJob implements ExecJob {
         backendException = e;
     }
 
-    @Override
     public Exception getException() {
         return backendException;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
 Fri Feb 24 03:34:37 2017
@@ -32,8 +32,7 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.TIPStatus;
-import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.FuncSpec;
@@ -41,6 +40,7 @@ import org.apache.pig.PigException;
 import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.PlanException;
@@ -76,7 +76,7 @@ public abstract class Launcher {
     protected Map<FileSpec, Exception> failureMap;
     protected JobControl jc = null;
 
-    protected class HangingJobKiller extends Thread {
+    class HangingJobKiller extends Thread {
         public HangingJobKiller() {}
 
         @Override
@@ -90,6 +90,7 @@ public abstract class Launcher {
     }
 
     protected Launcher() {
+        Runtime.getRuntime().addShutdownHook(new HangingJobKiller());
         // handle the windows portion of \r
         if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) 
{
             newLine = "\r\n";
@@ -103,6 +104,7 @@ public abstract class Launcher {
     public void reset() {
         failureMap = Maps.newHashMap();
         totalHadoopTimeSpent = 0;
+        jc = null;
     }
 
     /**
@@ -177,7 +179,7 @@ public abstract class Launcher {
             String exceptionCreateFailMsg = null;
             boolean jobFailed = false;
             if (msgs.length > 0) {
-                if (report.getCurrentStatus()== TIPStatus.FAILED) {
+                if (HadoopShims.isJobFailed(report)) {
                     jobFailed = true;
                 }
                 Set<String> errorMessageSet = new HashSet<String>();
@@ -259,30 +261,11 @@ public abstract class Launcher {
 
         List<Job> runnJobs = jc.getRunningJobs();
         for (Job j : runnJobs) {
-            prog += progressOfRunningJob(j);
+            prog += HadoopShims.progressOfRunningJob(j);
         }
         return prog;
     }
 
-    /**
-     * Returns the progress of a Job j which is part of a submitted JobControl
-     * object. The progress is for this Job. So it has to be scaled down by the
-     * num of jobs that are present in the JobControl.
-     *
-     * @param j The Job for which progress is required
-     * @return Returns the percentage progress of this Job
-     * @throws IOException
-     */
-    private static double progressOfRunningJob(Job j)
-            throws IOException {
-        org.apache.hadoop.mapreduce.Job mrJob = j.getJob();
-        try {
-            return (mrJob.mapProgress() + mrJob.reduceProgress()) / 2;
-        } catch (Exception ir) {
-            return 0;
-        }
-    }
-
     public long getTotalHadoopTimeSpent() {
         return totalHadoopTimeSpent;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
 Fri Feb 24 03:34:37 2017
@@ -25,7 +25,6 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
@@ -123,8 +122,7 @@ public class FetchLauncher {
         poStore.setUp();
 
         TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID();
-        //Fetch mode needs to explicitly set the task id which is otherwise 
done by Hadoop
-        conf.setInt(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, 
taskAttemptID.getId());
+        HadoopShims.setTaskAttemptId(conf, taskAttemptID);
 
         if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
             MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
 Fri Feb 24 03:34:37 2017
@@ -95,7 +95,7 @@ public class FetchPOStoreImpl extends PO
         }
         if (outputCommitter.needsTaskCommit(context))
             outputCommitter.commitTask(context);
-        outputCommitter.commitJob(context);
+        HadoopShims.commitOrCleanup(outputCommitter, context);
     }
 
     @Override
@@ -109,7 +109,7 @@ public class FetchPOStoreImpl extends PO
             }
             writer = null;
         }
-        outputCommitter.commitJob(context);
+        HadoopShims.commitOrCleanup(outputCommitter, context);
     }
 
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
 Fri Feb 24 03:34:37 2017
@@ -22,48 +22,43 @@ import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.pig.impl.io.NullableTuple;
+
 import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.io.NullableTuple;
 
 /**
  * A special implementation of combiner used only for distinct.  This combiner
  * does not even parse out the records.  It just throws away duplicate values
- * in the key in order to minimize the data being sent to the reduce.
+ * in the key in order ot minimize the data being sent to the reduce.
  */
 public class DistinctCombiner {
 
-    public static class Combine
+    public static class Combine 
         extends Reducer<PigNullableWritable, NullableTuple, 
PigNullableWritable, Writable> {
-
+        
         private final Log log = LogFactory.getLog(getClass());
 
-        private static boolean firstTime = true;
-
-        //@StaticDataCleanup
-        public static void staticDataCleanup() {
-            firstTime = true;
-        }
-
+        ProgressableReporter pigReporter;
+        
+        /**
+         * Configures the reporter 
+         */
         @Override
         protected void setup(Context context) throws IOException, 
InterruptedException {
             super.setup(context);
-            Configuration jConf = context.getConfiguration();
-            // Avoid log spamming
-            if (firstTime) {
-                log.info("Aliases being processed per job phase 
(AliasName[line,offset]): " + jConf.get("pig.alias.location"));
-                firstTime = false;
-            }
+            pigReporter = new ProgressableReporter();
         }
-
+        
         /**
          * The reduce function which removes values.
          */
         @Override
-        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> 
tupIter, Context context)
+        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> 
tupIter, Context context) 
                 throws IOException, InterruptedException {
+            
+            pigReporter.setRep(context);
 
             // Take the first value and the key and collect
             // just that.
@@ -71,7 +66,6 @@ public class DistinctCombiner {
             NullableTuple val = iter.next();
             context.write(key, val);
         }
-
     }
-
+    
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
 Fri Feb 24 03:34:37 2017
@@ -75,24 +75,16 @@ public class FileBasedOutputSizeReader i
             return -1;
         }
 
-        Path p = new Path(getLocationUri(sto));
-        return getPathSize(p, p.getFileSystem(conf));
-    }
-
-    private long getPathSize(Path storePath, FileSystem fs) throws IOException 
{
         long bytes = 0;
-        FileStatus[] lst = fs.listStatus(storePath);
+        Path p = new Path(getLocationUri(sto));
+        FileSystem fs = p.getFileSystem(conf);
+        FileStatus[] lst = fs.listStatus(p);
         if (lst != null) {
             for (FileStatus status : lst) {
-                if (status.isFile()) {
-                    if (status.getLen() > 0)
-                        bytes += status.getLen();
-                }
-                else { // recursively count nested leaves' (files) sizes
-                    bytes += getPathSize(status.getPath(), fs);
-                }
+                bytes += status.getLen();
             }
         }
+
         return bytes;
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
 Fri Feb 24 03:34:37 2017
@@ -92,7 +92,7 @@ public class InputSizeReducerEstimator i
         return reducers;
     }
 
-    public static long getTotalInputFileSize(Configuration conf,
+    static long getTotalInputFileSize(Configuration conf,
             List<POLoad> lds, Job job) throws IOException {
         return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE);
     }
@@ -100,7 +100,7 @@ public class InputSizeReducerEstimator i
     /**
      * Get the input size for as many inputs as possible. Inputs that do not 
report
      * their size nor can pig look that up itself are excluded from this size.
-     *
+     * 
      * @param conf Configuration
      * @param lds List of POLoads
      * @param job Job


Reply via email to