Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml 
(original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml Wed 
Feb 22 09:43:41 2017
@@ -294,6 +294,144 @@ team_parkyearslist = FOREACH (GROUP team
   </section>
 </section>
 
+<section id="bagtotuple">
+  <title>BagToTuple</title>
+  <p>Un-nests the elements of a bag into a tuple.</p>
+
+  <section>
+    <title>Syntax</title>
+    <table>
+      <tr>
+        <td>
+          <p>BagToTuple(expression)</p>
+        </td>
+      </tr>
+  </table></section>
+
+  <section>
+    <title>Terms</title>
+    <table>
+      <tr>
+        <td>
+         <p>expression</p>
+        </td>
+        <td>
+         <p>An expression with data type bag.</p>
+        </td>
+      </tr> 
+    </table>
+  </section>
+
+  <section>
+    <title>Usage</title>
+    <p>BagToTuple creates a tuple from the elements of a bag. It removes only
+      the first level of nesting; it does not recursively un-nest nested bags.
+      Unlike FLATTEN, BagToTuple will not generate multiple output records per
+      input record.
+    </p>
+  </section>
+  <section>
+    <title>Examples</title>
+    <p>In this example, a bag containing tuples with one field is converted to 
a tuple.</p>
+<source>
+A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:chararray)});
+
+DUMP A;
+({('a'),('b'),('c')})
+({('d'),('e'),('f')})
+
+X = FOREACH A GENERATE BagToTuple(B1);
+
+DUMP X;
+(('a','b','c'))
+(('d','e','f'))
+</source>
+    <p>In this example, a bag containing tuples with two fields is converted 
to a tuple.</p>
+<source>
+A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:int,f2:int)});
+
+DUMP A;
+({(4,1),(7,8),(4,9)})
+({(5,8),(4,3),(3,8)})
+
+X = FOREACH A GENERATE BagToTuple(B1);
+
+DUMP X;
+((4,1,7,8,4,9))
+((5,8,4,3,3,8))
+</source>
+  </section>
+</section>
+
+<section id="bagtotuple">
+  <title>BagToTuple</title>
+  <p>Un-nests the elements of a bag into a tuple.</p>
+
+  <section>
+    <title>Syntax</title>
+    <table>
+      <tr>
+        <td>
+          <p>BagToTuple(expression)</p>
+        </td>
+      </tr>
+  </table></section>
+
+  <section>
+    <title>Terms</title>
+    <table>
+      <tr>
+        <td>
+         <p>expression</p>
+        </td>
+        <td>
+         <p>An expression with data type bag.</p>
+        </td>
+      </tr> 
+    </table>
+  </section>
+
+  <section>
+    <title>Usage</title>
+    <p>BagToTuple creates a tuple from the elements of a bag. It removes only
+      the first level of nesting; it does not recursively un-nest nested bags.
+      Unlike FLATTEN, BagToTuple will not generate multiple output records per
+      input record.
+    </p>
+  </section>
+  <section>
+    <title>Examples</title>
+    <p>In this example, a bag containing tuples with one field is converted to 
a tuple.</p>
+<source>
+A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:chararray)});
+
+DUMP A;
+({('a'),('b'),('c')})
+({('d'),('e'),('f')})
+
+X = FOREACH A GENERATE BagToTuple(B1);
+
+DUMP X;
+(('a','b','c'))
+(('d','e','f'))
+</source>
+    <p>In this example, a bag containing tuples with two fields is converted 
to a tuple.</p>
+<source>
+A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:int,f2:int)});
+
+DUMP A;
+({(4,1),(7,8),(4,9)})
+({(5,8),(4,3),(3,8)})
+
+X = FOREACH A GENERATE BagToTuple(B1);
+
+DUMP X;
+((4,1,7,8,4,9))
+((5,8,4,3,3,8))
+</source>
+  </section>
+</section>
+
 <section id="bloom">
   <title>Bloom</title>
   <p>Bloom filters are a common way to select a limited set of records before
@@ -563,7 +701,80 @@ DUMP X;
          </tr> 
    </table>
    </section></section>
-   
+  
+
+<!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
+ <section id="in">
+ <title>IN</title>
+ <p>IN operator allows you to easily test if an expression matches any value 
in a list of values. It is used to reduce the need for multiple OR 
conditions.</p>
+
+ <section>
+ <title>Syntax</title>
+ <table>
+     <tr>
+          <td>
+             <p>IN (expression)</p>
+          </td>
+       </tr>
+ </table></section>
+
+ <section>
+ <title>Terms</title>
+ <table>
+     <tr>
+          <td>
+             <p>expression</p>
+          </td>
+          <td>
+             <p>An expression with data types chararray, int, long, float, 
double, bigdecimal, biginteger or bytearray.</p>
+          </td>
+       </tr>
+ </table></section>
+
+ <section>
+ <title>Usage</title>
+ <p>IN operator allows you to easily test if an expression matches any value 
in a list of values. It is used to help reduce the need for multiple OR 
conditions.</p>
+ </section>
+
+ <section>
+ <title>Example</title>
+ <p>In this example we filter out ID 4 and 6.</p>
+<source>
+A = load 'data' using PigStorage(',') AS (id:int, first:chararray, 
last:chararray, gender:chararray);
+
+DUMP A;
+(1,Christine,Romero,Female)
+(2,Sara,Hansen,Female)
+(3,Albert,Rogers,Male)
+(4,Kimberly,Morrison,Female)
+(5,Eugene,Baker,Male)
+(6,Ann,Alexander,Female)
+(7,Kathleen,Reed,Female)
+(8,Todd,Scott,Male)
+(9,Sharon,Mccoy,Female)
+(10,Evelyn,Rice,Female)
+
+X = FILTER A BY id IN (4, 6);
+DUMP X;
+(4,Kimberly,Morrison,Female)
+(6,Ann,Alexander,Female)
+</source>
+ </section>
+
+<p>In this example, we're passing a BigInteger and using NOT operator, thereby 
negating the passed list of fields in the IN clause</p>
+<source>
+A = load 'data' using PigStorage(',') AS (id:biginteger, first:chararray, 
last:chararray, gender:chararray); 
+X = FILTER A BY NOT id IN (1, 3, 5, 7, 9); 
+DUMP X;
+ 
+(2,Sara,Hansen,Female)
+(4,Kimberly,Morrison,Female)
+(6,Ann,Alexander,Female)
+(8,Todd,Scott,Male)
+(10,Evelyn,Rice,Female)
+</source>
+</section>
+
      <!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
  <section id="count-star">
    <title>COUNT_STAR</title>
@@ -1377,7 +1588,80 @@ DUMP X;
          </tr> 
    </table>
    </section></section>
-   
+  
+
+<!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
+ <section id="in">
+ <title>IN</title>
+ <p>IN operator allows you to easily test if an expression matches any value 
in a list of values. It is used to reduce the need for multiple OR 
conditions.</p>
+
+ <section>
+ <title>Syntax</title>
+ <table>
+     <tr>
+          <td>
+             <p>IN (expression)</p>
+          </td>
+       </tr>
+ </table></section>
+
+ <section>
+ <title>Terms</title>
+ <table>
+     <tr>
+          <td>
+             <p>expression</p>
+          </td>
+          <td>
+             <p>An expression with data types chararray, int, long, float, 
double, bigdecimal, biginteger or bytearray.</p>
+          </td>
+       </tr>
+ </table></section>
+
+ <section>
+ <title>Usage</title>
+ <p>IN operator allows you to easily test if an expression matches any value 
in a list of values. It is used to help reduce the need for multiple OR 
conditions.</p>
+ </section>
+
+ <section>
+ <title>Example</title>
+ <p>In this example we filter out ID 4 and 6.</p>
+<source>
+A = load 'data' using PigStorage(',') AS (id:int, first:chararray, 
last:chararray, gender:chararray);
+
+DUMP A;
+(1,Christine,Romero,Female)
+(2,Sara,Hansen,Female)
+(3,Albert,Rogers,Male)
+(4,Kimberly,Morrison,Female)
+(5,Eugene,Baker,Male)
+(6,Ann,Alexander,Female)
+(7,Kathleen,Reed,Female)
+(8,Todd,Scott,Male)
+(9,Sharon,Mccoy,Female)
+(10,Evelyn,Rice,Female)
+
+X = FILTER A BY id IN (4, 6);
+DUMP X;
+(4,Kimberly,Morrison,Female)
+(6,Ann,Alexander,Female)
+</source>
+ </section>
+
+<p>In this example, we're passing a BigInteger and using NOT operator, thereby 
negating the passed list of fields in the IN clause</p>
+<source>
+A = load 'data' using PigStorage(',') AS (id:biginteger, first:chararray, 
last:chararray, gender:chararray); 
+X = FILTER A BY NOT id IN (1, 3, 5, 7, 9); 
+DUMP X;
+ 
+(2,Sara,Hansen,Female)
+(4,Kimberly,Morrison,Female)
+(6,Ann,Alexander,Female)
+(8,Todd,Scott,Male)
+(10,Evelyn,Rice,Female)
+</source>
+</section>
+
      <!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
    <section id="tokenize">
    <title>TOKENIZE</title>
@@ -1442,7 +1726,7 @@ DUMP X;
 <source>
 {code}
 A = LOAD 'data' AS (f1:chararray);
-B = FOREACH A TOKENIZE (f1,'||');
+B = FOREACH A GENERATE TOKENIZE (f1,'||');
 DUMP B;
 {code} 
 </source>

Added: pig/branches/spark/src/docs/src/documentation/content/xdocs/v_editors.xml
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/v_editors.xml?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/v_editors.xml 
(added)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/v_editors.xml 
Wed Feb 22 09:43:41 2017
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" 
"http://forrest.apache.org/dtd/document-v20.dtd";>
+
+<document>
+    <header>
+        <title>Visual Editors</title>
+    </header>
+    <body>
+
+        <!-- 
====================================================================== -->
+        <!-- v_editors-->
+        <section id="zeppelin">
+            <title>Running Pig in Apache Zeppelin</title>
+
+            <p><a href="https://zeppelin.apache.org/";>Apache Zeppelin</a> is a 
web-based notebook that enables interactive data analytics. Pig is supported as 
a backend interpreter in Zeppelin starting from version 0.7. </p>
+            <p>User can do all the things in zeppelin as you do in grunt 
shell. Besides, you can take advantage of Zeppelin's visualization feature to 
visualize the Pig output. Here's 2 links for how to configure Pig in Zeppelin 
and how to run Pig script in Zeppelin.</p>
+
+            <ul>
+                <li>
+                    <a 
href="https://zeppelin.apache.org/docs/latest/interpreter/pig.html";>https://zeppelin.apache.org/docs/latest/interpreter/pig.html</a>
+                </li>
+                <li>
+                    <a 
href="https://cwiki.apache.org/confluence/display/ZEPPELIN/Running+Pig+in+Apache+Zeppelin";>https://cwiki.apache.org/confluence/display/ZEPPELIN/Running+Pig+in+Apache+Zeppelin</a>
+                </li>
+            </ul>
+
+            <p><strong>Screenshot of running Pig in Zeppelin</strong> </p>
+            <p><img alt="Pig in zeppelin" src="images/pig_zeppelin.png" 
title="Pig in zeppelin"></img></p>
+
+        </section>
+    </body>
+</document>
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java Wed Feb 
22 09:43:41 2017
@@ -34,10 +34,10 @@ public class CounterBasedErrorHandler im
 
     public CounterBasedErrorHandler() {
         Configuration conf = UDFContext.getUDFContext().getJobConf();
-        this.minErrors = conf.getLong(PigConfiguration.PIG_ERRORS_MIN_RECORDS,
+        this.minErrors = 
conf.getLong(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS,
                 0);
         this.errorThreshold = conf.getFloat(
-                PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT, 0.0f);
+                PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT, 0.0f);
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/EvalFunc.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/EvalFunc.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/EvalFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/EvalFunc.java Wed Feb 22 09:43:41 2017
@@ -369,4 +369,17 @@ public abstract class EvalFunc<T>  {
 
     public void setEndOfAllInput(boolean endOfAllInput) {
     }
+
+    /**
+     * This will be called on both the front end and the back
+     * end during execution.
+     * @return the {@link LoadCaster} associated with this eval. Returning null
+     * indicates that casts from bytearray will pick the one associated with 
the
+     * parameters when they all come from the same loadcaster type.
+     * @throws IOException if there is an exception during LoadCaster
+     */
+    public LoadCaster getLoadCaster() throws IOException {
+        return null;
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java (original)
+++ pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java Wed Feb 22 09:43:41 
2017
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -47,6 +48,7 @@ public class JVMReuseImpl {
         PigGenericMapReduce.staticDataCleanup();
         PigStatusReporter.staticDataCleanup();
         PigCombiner.Combine.staticDataCleanup();
+        DistinctCombiner.Combine.staticDataCleanup();
 
         String className = null;
         String msg = null;

Modified: pig/branches/spark/src/org/apache/pig/LoadFunc.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/LoadFunc.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/LoadFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/LoadFunc.java Wed Feb 22 09:43:41 2017
@@ -108,7 +108,7 @@ public abstract class LoadFunc {
     public abstract InputFormat getInputFormat() throws IOException;
 
     /**
-     * This will be called on the front end during planning and not on the 
back 
+     * This will be called on both the front end and the back
      * end during execution.
      * @return the {@link LoadCaster} associated with this loader. Returning 
null 
      * indicates that casts from byte array are not supported for this loader. 

Modified: pig/branches/spark/src/org/apache/pig/Main.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/Main.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/Main.java (original)
+++ pig/branches/spark/src/org/apache/pig/Main.java Wed Feb 22 09:43:41 2017
@@ -27,7 +27,6 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
 import java.io.Reader;
 import java.io.StringReader;
 import java.net.URL;
@@ -45,9 +44,8 @@ import java.util.jar.Attributes;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
 
-import jline.ConsoleReader;
-import jline.ConsoleReaderInputStream;
-import jline.History;
+import jline.console.ConsoleReader;
+import jline.console.history.FileHistory;
 
 import org.antlr.runtime.RecognitionException;
 import org.apache.commons.logging.Log;
@@ -59,6 +57,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.classification.InterfaceAudience;
@@ -76,6 +75,7 @@ import org.apache.pig.parser.DryRunGrunt
 import org.apache.pig.scripting.ScriptEngine;
 import org.apache.pig.scripting.ScriptEngine.SupportedScriptLang;
 import org.apache.pig.tools.cmdline.CmdLineParser;
+import org.apache.pig.tools.grunt.ConsoleReaderInputStream;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStats;
@@ -100,13 +100,12 @@ import com.google.common.io.Closeables;
 public class Main {
 
     static {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-
+        Utils.addShutdownHookWithPriority(new Runnable() {
             @Override
             public void run() {
                 FileLocalizer.deleteTempResourceFiles();
             }
-        });
+        }, PigImplConstants.SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY);
     }
 
     private final static Log log = LogFactory.getLog(Main.class);
@@ -477,7 +476,7 @@ public class Main {
                 }
 
 
-                logFileName = validateLogFile(logFileName, file);
+                logFileName = validateLogFile(logFileName, localFileRet.file);
                 pigContext.getProperties().setProperty("pig.logfile", 
(logFileName == null? "": logFileName));
 
                 // Set job name based on name of the script
@@ -488,7 +487,7 @@ public class Main {
                     new File(substFile).deleteOnExit();
                 }
 
-                scriptState.setScript(new File(file));
+                scriptState.setScript(localFileRet.file);
 
                 grunt = new Grunt(pin, pigContext);
                 gruntCalled = true;
@@ -551,12 +550,13 @@ public class Main {
                 }
                 // Interactive
                 mode = ExecMode.SHELL;
-              //Reader is created by first loading 
"pig.load.default.statements" or .pigbootup file if available
-                ConsoleReader reader = new 
ConsoleReader(Utils.getCompositeStream(System.in, properties), new 
OutputStreamWriter(System.out));
-                reader.setDefaultPrompt("grunt> ");
+                //Reader is created by first loading 
"pig.load.default.statements" or .pigbootup file if available
+                ConsoleReader reader = new 
ConsoleReader(Utils.getCompositeStream(System.in, properties), System.out);
+                reader.setExpandEvents(false);
+                reader.setPrompt("grunt> ");
                 final String HISTORYFILE = ".pig_history";
                 String historyFile = System.getProperty("user.home") + 
File.separator  + HISTORYFILE;
-                reader.setHistory(new History(new File(historyFile)));
+                reader.setHistory(new FileHistory(new File(historyFile)));
                 ConsoleReaderInputStream inputStream = new 
ConsoleReaderInputStream(reader);
                 grunt = new Grunt(new BufferedReader(new 
InputStreamReader(inputStream)), pigContext);
                 grunt.setConsoleReader(reader);
@@ -605,7 +605,7 @@ public class Main {
                     return ReturnCode.SUCCESS;
                 }
 
-                logFileName = validateLogFile(logFileName, remainders[0]);
+                logFileName = validateLogFile(logFileName, localFileRet.file);
                 pigContext.getProperties().setProperty("pig.logfile", 
(logFileName == null? "": logFileName));
 
                 if (!debug) {
@@ -660,6 +660,7 @@ public class Main {
             if(!gruntCalled) {
                 LogUtils.writeLog(e, logFileName, log, verbose, "Error before 
Pig is launched");
             }
+            killRunningJobsIfInterrupted(e, pigContext);
         } catch (Throwable e) {
             rc = ReturnCode.THROWABLE_EXCEPTION;
             PigStatsUtil.setErrorMessage(e.getMessage());
@@ -668,6 +669,7 @@ public class Main {
             if(!gruntCalled) {
                 LogUtils.writeLog(e, logFileName, log, verbose, "Error before 
Pig is launched");
             }
+            killRunningJobsIfInterrupted(e, pigContext);
         } finally {
             if (printScriptRunTime) {
                 printScriptRunTime(startTime);
@@ -694,6 +696,22 @@ public class Main {
                 + " (" + duration.getMillis() + " ms)");
     }
 
+    private static void killRunningJobsIfInterrupted(Throwable e, PigContext 
pigContext) {
+        Throwable cause = e.getCause();
+        // Kill running job when we get InterruptedException
+        // Pig thread is interrupted by mapreduce when Oozie launcher job is 
killed
+        // Shutdown hook kills running jobs, but sometimes NodeManager can 
issue
+        // a SIGKILL after AM unregisters and before shutdown hook gets to 
execute
+        // causing orphaned jobs that continue to run.
+        if (e instanceof InterruptedException || (cause != null && cause 
instanceof InterruptedException)) {
+            try {
+                pigContext.getExecutionEngine().kill();
+            } catch (BackendException be) {
+                log.error("Error while killing running jobs", be);
+            }
+        }
+    }
+
     protected static PigProgressNotificationListener makeListener(Properties 
properties) {
 
         try {
@@ -971,11 +989,10 @@ public class Main {
             System.out.println("Additionally, any Hadoop property can be 
specified.");
     }
 
-    private static String validateLogFile(String logFileName, String 
scriptName) {
+    private static String validateLogFile(String logFileName, File scriptFile) 
{
         String strippedDownScriptName = null;
 
-        if(scriptName != null) {
-            File scriptFile = new File(scriptName);
+        if (scriptFile != null) {
             if(!scriptFile.isDirectory()) {
                 String scriptFileAbsPath;
                 try {

Added: pig/branches/spark/src/org/apache/pig/NonFSLoadFunc.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/NonFSLoadFunc.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/NonFSLoadFunc.java (added)
+++ pig/branches/spark/src/org/apache/pig/NonFSLoadFunc.java Wed Feb 22 
09:43:41 2017
@@ -0,0 +1,25 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.pig;
+
+/**
+ *  * Marker interface to distinguish LoadFunc implementations that don't use 
file system sources.
+ *   */
+public interface NonFSLoadFunc {
+
+}

Modified: pig/branches/spark/src/org/apache/pig/PigConfiguration.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigConfiguration.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigConfiguration.java Wed Feb 22 
09:43:41 2017
@@ -18,6 +18,7 @@
 
 package org.apache.pig;
 
+
 /**
  * Container for static configuration strings, defaults, etc. This is intended 
just for keys that can
  * be set by users, not for keys that are generally used within pig.
@@ -62,9 +63,15 @@ public class PigConfiguration {
     public static final String PIG_TEZ_OPT_UNION = "pig.tez.opt.union";
     /**
      * These keys are used to enable or disable tez union optimization for
-     * specific StoreFuncs so that optimization is only applied to StoreFuncs
-     * that do not hard part file names and honor mapreduce.output.basename and
-     * is turned of for those that do not. Refer PIG-4649
+     * specific StoreFuncs. Optimization should be turned off for those
+     * StoreFuncs that hard code part file names and do not prefix file names
+     * with mapreduce.output.basename configuration.
+     *
+     * If the StoreFuncs implement
+     * {@link StoreFunc#supportsParallelWriteToStoreLocation()} and return true
+     * or false then that is is used to turn on or off union optimization
+     * respectively. These settings can be used for StoreFuncs that have not
+     * implemented the API yet.
      */
     public static final String PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS = 
"pig.tez.opt.union.supported.storefuncs";
     public static final String PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS = 
"pig.tez.opt.union.unsupported.storefuncs";
@@ -127,6 +134,58 @@ public class PigConfiguration {
     public static final String PIG_SKEWEDJOIN_REDUCE_MEM = 
"pig.skewedjoin.reduce.mem";
 
     /**
+     * Bloom join has two different kind of implementations.
+     * <ul>
+     * <li>map <br>
+     * In each map, bloom filters are computed on the join keys partitioned by
+     * the hashcode of the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number 
of
+     * partitions. Bloom filters from different maps are then combined in the
+     * reducer producing one bloom filter per partition. This is efficient and
+     * fast if there are smaller number of maps (<10) and the number of
+     * distinct keys are not too high. It can be faster with larger number of
+     * maps and even with bigger bloom vector sizes, but the amount of data
+     * shuffled to the reducer for aggregation becomes huge making it
+     * inefficient.</li>
+     * <li>reduce <br>
+     * Join keys are sent from the map to the reducer partitioned by hashcode 
of
+     * the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of reducers. One
+     * bloom filter is then created per partition. This is efficient for larger
+     * datasets with lot of maps or very large
+     * {@link #PIG_BLOOMJOIN_VECTORSIZE_BYTES}. In this case size of keys sent
+     * to the reducer is smaller than sending bloom filters to reducer for
+     * aggregation making it efficient.</li>
+     * </ul>
+     * Default value is map.
+     */
+    public static final String PIG_BLOOMJOIN_STRATEGY = 
"pig.bloomjoin.strategy";
+
+    /**
+     * The number of bloom filters that will be created.
+     * Default is 1 for map strategy and 11 for reduce strategy.
+     */
+    public static final String PIG_BLOOMJOIN_NUM_FILTERS = 
"pig.bloomjoin.num.filters";
+
+    /**
+     * The size in bytes of the bit vector to be used for the bloom filter.
+     * A bigger vector size will be needed when the number of distinct keys is 
higher.
+     * Default value is 1048576 (1MB).
+     */
+    public static final String PIG_BLOOMJOIN_VECTORSIZE_BYTES = 
"pig.bloomjoin.vectorsize.bytes";
+
+    /**
+     * The type of hash function to use. Valid values are jenkins and murmur.
+     * Default is murmur.
+     */
+    public static final String PIG_BLOOMJOIN_HASH_TYPE = 
"pig.bloomjoin.hash.type";
+
+    /**
+     * The number of hash functions to be used in bloom computation. It 
determines the probability of false positives.
+     * Higher the number lower the false positives. Too high a value can 
increase the cpu time.
+     * Default value is 3.
+     */
+    public static final String PIG_BLOOMJOIN_HASH_FUNCTIONS = 
"pig.bloomjoin.hash.functions";
+
+    /**
      * This key used to control the maximum size loaded into
      * the distributed cache when doing fragment-replicated join
      */
@@ -151,6 +210,12 @@ public class PigConfiguration {
      * This key is used to configure grace parallelism in tez. Default is true.
      */
     public static final String PIG_TEZ_GRACE_PARALLELISM = 
"pig.tez.grace.parallelism";
+    /**
+     * This key is used to turn off dag recovery if there is auto parallelism.
+     * Default is false. Useful when running with Tez versions before Tez 0.8
+     * which have issues with auto parallelism during DAG recovery.
+     */
+    public static final String PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY = 
"pig.tez.auto.parallelism.disable.dag.recovery";
 
     /**
      * This key is used to configure compression for the pig input splits which
@@ -324,17 +389,17 @@ public class PigConfiguration {
     /**
      * Boolean value used to enable or disable error handling for storers
      */
-    public static final String PIG_ALLOW_STORE_ERRORS = 
"pig.allow.store.errors";
+    public static final String PIG_ERROR_HANDLING_ENABLED = 
"pig.error-handling.enabled";
 
     /**
      * Controls the minimum number of errors
      */
-    public static final String PIG_ERRORS_MIN_RECORDS = 
"pig.errors.min.records";
+    public static final String PIG_ERROR_HANDLING_MIN_ERROR_RECORDS = 
"pig.error-handling.min.error.records";
 
     /**
      * Set the threshold for percentage of errors
      */
-    public static final String PIG_ERROR_THRESHOLD_PERCENT = 
"pig.error.threshold.percent";
+    public static final String PIG_ERROR_HANDLING_THRESHOLD_PERCENT = 
"pig.error-handling.error.threshold";
 
     /**
      * Comma-delimited entries of commands/operators that must be disallowed.
@@ -411,6 +476,31 @@ public class PigConfiguration {
      */
     public static final String PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE = 
"pig.spill.unused.memory.threshold.size";
 
+    /**
+     * Log tracing id that can be used by upstream clients for tracking 
respective logs
+     */
+    public static final String PIG_LOG_TRACE_ID = "pig.log.trace.id";
+
+    /**
+     * @deprecated use {@link #PIG_LOG_TRACE_ID} instead. Will be removed in 
Pig 0.18
+     */
+    public static final String CALLER_ID = PIG_LOG_TRACE_ID;
+
+    /**
+     * Enable ATS for Pig
+     */
+    public static final String PIG_ATS_ENABLED = "pig.ats.enabled";
+
+    /**
+     * @deprecated use {@link #PIG_ATS_ENABLED} instead. Will be removed in 
Pig 0.18
+     */
+    public static final String ENABLE_ATS = PIG_ATS_ENABLED;
+
+    /**
+     * If set, Pig will override tez.am.launch.cmd-opts and 
tez.am.resource.memory.mb to optimal
+     */
+    public static final String PIG_TEZ_CONFIGURE_AM_MEMORY = 
"pig.tez.configure.am.memory";
+
     // Deprecated settings of Pig 0.13
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/PigServer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigServer.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigServer.java Wed Feb 22 09:43:41 
2017
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.io.StringReader;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -53,6 +55,7 @@ import org.apache.pig.backend.datastorag
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.hadoop.PigATSClient;
 import org.apache.pig.backend.hadoop.executionengine.HJob;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.classification.InterfaceAudience;
@@ -241,6 +244,54 @@ public class PigServer {
         }
         PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
 
+        // log ATS event includes the caller context
+        String auditId = PigATSClient.getPigAuditId(pigContext);
+        String callerId = 
(String)pigContext.getProperties().get(PigConfiguration.PIG_LOG_TRACE_ID);
+        log.info("Pig Script ID for the session: " + auditId);
+        if (callerId != null) {
+            log.info("Caller ID for session: " + callerId);
+        }
+        if (Boolean.parseBoolean(pigContext.getProperties()
+                .getProperty(PigConfiguration.PIG_ATS_ENABLED))) {
+            if (Boolean.parseBoolean(pigContext.getProperties()
+                    .getProperty("yarn.timeline-service.enabled", "false"))) {
+                PigATSClient.ATSEvent event = new 
PigATSClient.ATSEvent(auditId, callerId);
+                try {
+                    PigATSClient.getInstance().logEvent(event);
+                } catch (Exception e) {
+                    log.warn("Error posting to ATS: ", e);
+                }
+            } else {
+                log.warn("ATS is disabled since"
+                        + " yarn.timeline-service.enabled set to false");
+            }
+
+        }
+
+        // set hdfs caller context
+        Class callerContextClass = null;
+        try {
+            callerContextClass = 
Class.forName("org.apache.hadoop.ipc.CallerContext");
+        } catch (ClassNotFoundException e) {
+            // If pre-Hadoop 2.8.0, skip setting CallerContext
+        }
+        if (callerContextClass != null) {
+            try {
+                // Reflection for the following code since it is only 
available since hadoop 2.8.0:
+                // CallerContext hdfsContext = new 
CallerContext.Builder(auditId).build();
+                // CallerContext.setCurrent(hdfsContext);
+                Class callerContextBuilderClass = 
Class.forName("org.apache.hadoop.ipc.CallerContext$Builder");
+                Constructor callerContextBuilderConstruct = 
callerContextBuilderClass.getConstructor(String.class);
+                Object builder = 
callerContextBuilderConstruct.newInstance(auditId);
+                Method builderBuildMethod = 
builder.getClass().getMethod("build");
+                Object hdfsContext = builderBuildMethod.invoke(builder);
+                Method callerContextSetCurrentMethod = 
callerContextClass.getMethod("setCurrent", hdfsContext.getClass());
+                callerContextSetCurrentMethod.invoke(callerContextClass, 
hdfsContext);
+            } catch (Exception e) {
+                // Shall not happen unless API change in future Hadoop commons
+                throw new ExecException(e);
+            }
+        }
     }
 
     private void addHadoopProperties() throws ExecException {
@@ -612,7 +663,8 @@ public class PigServer {
             pigContext.scriptingUDFs.put(path, namespace);
         }
 
-        File f = FileLocalizer.fetchFile(pigContext.getProperties(), 
path).file;
+        FetchFileRet ret = FileLocalizer.fetchFile(pigContext.getProperties(), 
path);
+        File f = ret.file;
         if (!f.canRead()) {
             int errCode = 4002;
             String msg = "Can't read file: " + path;
@@ -621,9 +673,19 @@ public class PigServer {
         }
         String cwd = new File(".").getCanonicalPath();
         String filePath = f.getCanonicalPath();
-        //Use the relative path in the jar, if the path specified is relative
-        String nameInJar = filePath.equals(cwd + File.separator + path) ?
-                filePath.substring(cwd.length() + 1) : filePath;
+        String nameInJar = filePath;
+        // Use the relative path in the jar, if the path specified is relative
+        if (!ret.didFetch) {
+            if (!new File(path).isAbsolute() && path.indexOf("." + 
File.separator) == -1) {
+                // In case of Oozie, the localized files are in a different
+                // directory symlinked to the current directory. Canonical 
path will not point to cwd.
+                nameInJar = path;
+            } else if (filePath.equals(cwd + File.separator + path)) {
+                // If user specified absolute path and it refers to cwd
+                nameInJar = filePath.substring(cwd.length() + 1);
+            }
+        }
+
         pigContext.addScriptFile(nameInJar, filePath);
         if(scriptingLang != null) {
             ScriptEngine se = ScriptEngine.getInstance(scriptingLang);

Modified: pig/branches/spark/src/org/apache/pig/StoreFunc.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/StoreFunc.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/StoreFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/StoreFunc.java Wed Feb 22 09:43:41 
2017
@@ -21,17 +21,14 @@ import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
-
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 
 /**
@@ -45,21 +42,21 @@ public abstract class StoreFunc implemen
     /**
      * This method is called by the Pig runtime in the front end to convert the
      * output location to an absolute path if the location is relative. The
-     * StoreFunc implementation is free to choose how it converts a relative 
+     * StoreFunc implementation is free to choose how it converts a relative
      * location to an absolute location since this may depend on what the 
location
-     * string represent (hdfs path or some other data source). 
-     *  
-     * 
+     * string represent (hdfs path or some other data source).
+     *
+     *
      * @param location location as provided in the "store" statement of the 
script
      * @param curDir the current working direction based on any "cd" statements
      * in the script before the "store" statement. If there are no "cd" 
statements
-     * in the script, this would be the home directory - 
+     * in the script, this would be the home directory -
      * <pre>/user/<username> </pre>
      * @return the absolute location based on the arguments passed
      * @throws IOException if the conversion is not possible
      */
     @Override
-    public String relToAbsPathForStoreLocation(String location, Path curDir) 
+    public String relToAbsPathForStoreLocation(String location, Path curDir)
     throws IOException {
         return LoadFunc.getAbsolutePath(location, curDir);
     }
@@ -67,32 +64,34 @@ public abstract class StoreFunc implemen
     /**
      * Return the OutputFormat associated with StoreFunc.  This will be called
      * on the front end during planning and on the backend during
-     * execution. 
+     * execution.
      * @return the {@link OutputFormat} associated with StoreFunc
-     * @throws IOException if an exception occurs while constructing the 
+     * @throws IOException if an exception occurs while constructing the
      * OutputFormat
      *
      */
+    @Override
     public abstract OutputFormat getOutputFormat() throws IOException;
 
     /**
-     * Communicate to the storer the location where the data needs to be 
stored.  
-     * The location string passed to the {@link StoreFunc} here is the 
-     * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, 
Path)} 
+     * Communicate to the storer the location where the data needs to be 
stored.
+     * The location string passed to the {@link StoreFunc} here is the
+     * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, 
Path)}
      * This method will be called in the frontend and backend multiple times. 
Implementations
      * should bear in mind that this method is called multiple times and should
      * ensure there are no inconsistent side effects due to the multiple calls.
      * {@link #checkSchema(ResourceSchema)} will be called before any call to
      * {@link #setStoreLocation(String, Job)}.
-     * 
+     *
 
-     * @param location Location returned by 
+     * @param location Location returned by
      * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
      * @param job The {@link Job} object
      * @throws IOException if the location is not valid.
      */
+    @Override
     public abstract void setStoreLocation(String location, Job job) throws 
IOException;
- 
+
     /**
      * Set the schema for data to be stored.  This will be called on the
      * front end during planning if the store is associated with a schema.
@@ -117,21 +116,23 @@ public abstract class StoreFunc implemen
      * @param writer RecordWriter to use.
      * @throws IOException if an exception occurs during initialization
      */
+    @Override
     public abstract void prepareToWrite(RecordWriter writer) throws 
IOException;
 
     /**
      * Write a tuple to the data store.
-     * 
+     *
      * @param t the tuple to store.
      * @throws IOException if an exception occurs during the write
      */
+    @Override
     public abstract void putNext(Tuple t) throws IOException;
-    
+
     /**
      * This method will be called by Pig both in the front end and back end to
      * pass a unique signature to the {@link StoreFunc} which it can use to 
store
      * information in the {@link UDFContext} which it needs to store between
-     * various method invocations in the front end and back end. This method 
+     * various method invocations in the front end and back end. This method
      * will be called before other methods in {@link StoreFunc}.  This is 
necessary
      * because in a Pig Latin script with multiple stores, the different
      * instances of store functions need to be able to find their (and only 
their)
@@ -142,21 +143,21 @@ public abstract class StoreFunc implemen
     public void setStoreFuncUDFContextSignature(String signature) {
         // default implementation is a no-op
     }
-    
+
     /**
      * This method will be called by Pig if the job which contains this store
      * fails. Implementations can clean up output locations in this method to
      * ensure that no incorrect/incomplete results are left in the output 
location.
      * The default implementation  deletes the output location if it
      * is a {@link FileSystem} location.
-     * @param location Location returned by 
+     * @param location Location returned by
      * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
-     * @param job The {@link Job} object - this should be used only to obtain 
+     * @param job The {@link Job} object - this should be used only to obtain
      * cluster properties through {@link Job#getConfiguration()} and not to 
set/query
-     * any runtime job information. 
+     * any runtime job information.
      */
     @Override
-    public void cleanupOnFailure(String location, Job job) 
+    public void cleanupOnFailure(String location, Job job)
     throws IOException {
         cleanupOnFailureImpl(location, job);
     }
@@ -166,19 +167,19 @@ public abstract class StoreFunc implemen
      * is successful, and some cleanup of intermediate resources is required.
      * Implementations can clean up output locations in this method to
      * ensure that no incorrect/incomplete results are left in the output 
location.
-     * @param location Location returned by 
+     * @param location Location returned by
      * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
-     * @param job The {@link Job} object - this should be used only to obtain 
+     * @param job The {@link Job} object - this should be used only to obtain
      * cluster properties through {@link Job#getConfiguration()} and not to 
set/query
-     * any runtime job information. 
+     * any runtime job information.
      */
     @Override
-    public void cleanupOnSuccess(String location, Job job) 
+    public void cleanupOnSuccess(String location, Job job)
     throws IOException {
         // DEFAULT: DO NOTHING, user-defined overrides can
         // call cleanupOnFailureImpl(location, job) or ...?
     }
-    
+
     /**
      * Default implementation for {@link #cleanupOnFailure(String, Job)}
      * and {@link #cleanupOnSuccess(String, Job)}.  This removes a file
@@ -187,15 +188,56 @@ public abstract class StoreFunc implemen
      * @param job Hadoop job, used to access the appropriate file system.
      * @throws IOException
      */
-    public static void cleanupOnFailureImpl(String location, Job job) 
-    throws IOException {        
+    public static void cleanupOnFailureImpl(String location, Job job)
+    throws IOException {
         Path path = new Path(location);
         FileSystem fs = path.getFileSystem(job.getConfiguration());
         if(fs.exists(path)){
             fs.delete(path, true);
-        }    
+        }
+    }
+
+    // TODO When dropping support for JDK 7 move this as a default method to 
StoreFuncInterface
+    /**
+     * DAG execution engines like Tez support optimizing union by writing to
+     * output location in parallel from tasks of different vertices. Commit is
+     * called once all the vertices in the union are complete. This eliminates
+     * need to have a separate phase to read data output from previous phases,
+     * union them and write out again.
+     *
+     * Enabling the union optimization requires the OutputFormat to
+     *
+     *      1) Support creation of different part file names for tasks of 
different
+     * vertices. Conflicting filenames can create data corruption and loss.
+     * For eg: If task 0 of vertex1 and vertex2 both create filename as
+     * part-r-00000, then one of the files will be overwritten when promoting
+     * from temporary to final location leading to data loss.
+     *      FileOutputFormat has mapreduce.output.basename config which enables
+     *  naming files differently in different vertices. Classes extending
+     *  FileOutputFormat and those prefixing file names with 
mapreduce.output.basename
+     *  value will not encounter conflict. Cases like HBaseStorage which write 
to key
+     *  value store and do not produce files also should not face any conflict.
+     *
+     *      2) Support calling of commit once at the end takes care of 
promoting
+     * temporary files of the different vertices into the final location.
+     * For eg: FileOutputFormat commit algorithm handles promoting of files 
produced
+     * by tasks of different vertices into final output location without issues
+     * if there is no file name conflict. In cases like HBaseStorage, the
+     * TableOutputCommitter does nothing on commit.
+     *
+     * If custom OutputFormat used by the StoreFunc does not support the above
+     * two criteria, then false should be returned. Union optimization will be
+     * disabled for the StoreFunc.
+     *
+     * Default implementation returns null and in that case planner falls back
+     * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and
+     * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS}
+     * settings to determine if the StoreFunc supports it.
+     */
+    public Boolean supportsParallelWriteToStoreLocation() {
+        return null;
     }
-    
+
     /**
      * Issue a warning.  Warning messages are aggregated and reported to
      * the user.

Modified: pig/branches/spark/src/org/apache/pig/StreamToPig.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/StreamToPig.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/StreamToPig.java (original)
+++ pig/branches/spark/src/org/apache/pig/StreamToPig.java Wed Feb 22 09:43:41 
2017
@@ -57,7 +57,7 @@ public interface StreamToPig {
     public Tuple deserialize(byte[] bytes) throws IOException;
 
     /**
-     * This will be called on the front end during planning and not on the back
+     * This will be called on both the front end and the back
      * end during execution.
      *
      * @return the {@link LoadCaster} associated with this object, or null if

Modified: 
pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
 Wed Feb 22 09:43:41 2017
@@ -183,6 +183,14 @@ public interface ExecutionEngine {
     public ExecutableManager getExecutableManager();
 
     /**
+     * This method is called when user requests to kill all jobs
+     * associated with the execution engine
+     *
+     * @throws BackendException
+     */
+    public void kill() throws BackendException;
+
+    /**
      * This method is called when a user requests to kill a job associated with
      * the given job id. If it is not possible for a user to kill a job, throw 
a
      * exception. It is imperative for the job id's being displayed to be 
unique

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java 
(added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java Wed 
Feb 22 09:43:41 2017
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.ScriptState;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class PigATSClient {
+    public static class ATSEvent {
+        public ATSEvent(String pigAuditId, String callerId) {
+            this.pigScriptId = pigAuditId;
+            this.callerId = callerId;
+        }
+        String callerId;
+        String pigScriptId;
+    }
+    public static final String ENTITY_TYPE = "PIG_SCRIPT_ID";
+    public static final String ENTITY_CALLERID = "callerId";
+    public static final String CALLER_CONTEXT = "PIG";
+    public static final int AUDIT_ID_MAX_LENGTH = 128;
+
+    private static final Log log = 
LogFactory.getLog(PigATSClient.class.getName());
+    private static PigATSClient instance;
+    private static ExecutorService executor;
+    private TimelineClient timelineClient;
+
+    public static synchronized PigATSClient getInstance() {
+        if (instance==null) {
+            instance = new PigATSClient();
+        }
+        return instance;
+    }
+
+    private PigATSClient() {
+        if (executor == null) {
+            executor = Executors.newSingleThreadExecutor(
+                    new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build());
+            YarnConfiguration yarnConf = new YarnConfiguration();
+            timelineClient = TimelineClient.createTimelineClient();
+            timelineClient.init(yarnConf);
+            timelineClient.start();
+        }
+        Utils.addShutdownHookWithPriority(new Runnable() {
+            @Override
+            public void run() {
+                timelineClient.stop();
+                executor.shutdownNow();
+                executor = null;
+            }
+        }, PigImplConstants.SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY);
+        log.info("Created ATS Hook");
+    }
+
+    public static String getPigAuditId(PigContext context) {
+        String auditId;
+        if (context.getProperties().get(PigImplConstants.PIG_AUDIT_ID) != 
null) {
+            auditId = 
(String)context.getProperties().get(PigImplConstants.PIG_AUDIT_ID);
+        } else {
+            ScriptState ss = ScriptState.get();
+            String filename = ss.getFileName().isEmpty()?"default" : new 
File(ss.getFileName()).getName();
+            auditId = CALLER_CONTEXT + "-" + filename + "-" + ss.getId();
+        }
+        return auditId.substring(0, Math.min(auditId.length(), 
AUDIT_ID_MAX_LENGTH));
+    }
+
+    synchronized public void logEvent(final ATSEvent event) {
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                TimelineEntity entity = new TimelineEntity();
+                entity.setEntityId(event.pigScriptId);
+                entity.setEntityType(ENTITY_TYPE);
+                entity.addPrimaryFilter(ENTITY_CALLERID, 
event.callerId!=null?event.callerId : "default");
+                try {
+                    timelineClient.putEntities(entity);
+                } catch (Exception e) {
+                    log.info("Failed to submit plan to ATS: " + 
e.getMessage());
+                }
+            }
+        });
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java 
(added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java Wed 
Feb 22 09:43:41 2017
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
+
+/**
+ * extends the hadoop JobControl to remove the hardcoded sleep(5000)
+ * as most of this is private we have to use reflection
+ *
+ * See {@link 
https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java
 }
+ *
+ */
+public class PigJobControl extends JobControl {
+  private static final Log log = LogFactory.getLog(PigJobControl.class);
+
+  private static Field runnerState;
+  private static Field jobsInProgress;
+  private static Field successfulJobs;
+  private static Field failedJobs;
+
+  private static Method failAllJobs;
+
+  private static Method checkState;
+  private static Method submit;
+
+  private static boolean initSuccesful;
+
+  static {
+    try {
+
+      runnerState = 
org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("runnerState");
+      runnerState.setAccessible(true);
+      jobsInProgress = 
org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("jobsInProgress");
+      jobsInProgress.setAccessible(true);
+      successfulJobs = 
org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("successfulJobs");
+      successfulJobs.setAccessible(true);
+      failedJobs = 
org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("failedJobs");
+      failedJobs.setAccessible(true);
+
+      failAllJobs = 
org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredMethod("failAllJobs",
 Throwable.class);
+      failAllJobs.setAccessible(true);
+
+      checkState = ControlledJob.class.getDeclaredMethod("checkState");
+      checkState.setAccessible(true);
+      submit = ControlledJob.class.getDeclaredMethod("submit");
+      submit.setAccessible(true);
+
+      initSuccesful = true;
+    } catch (Exception e) {
+      log.debug("falling back to default JobControl (not using hadoop 0.23 
?)", e);
+      initSuccesful = false;
+    }
+  }
+
+  protected int timeToSleep;
+
+  /**
+   * Construct a job control for a group of jobs.
+   * @param groupName a name identifying this group
+   * @param pigContext
+   * @param conf
+   */
+  public PigJobControl(String groupName, int timeToSleep) {
+    super(groupName);
+    this.timeToSleep = timeToSleep;
+  }
+
+  public int getTimeToSleep() {
+    return timeToSleep;
+  }
+
+  public void setTimeToSleep(int timeToSleep) {
+    this.timeToSleep = timeToSleep;
+  }
+
+  private void setRunnerState(ThreadState state) {
+    try {
+      runnerState.set(this, state);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  private ThreadState getRunnerState() {
+    try {
+      return (ThreadState)runnerState.get(this);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private State checkState(ControlledJob j) {
+    try {
+      return (State)checkState.invoke(j);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private State submit(ControlledJob j) {
+    try {
+      return (State)submit.invoke(j);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private LinkedList<ControlledJob> getJobs(Field field) {
+    try {
+      return (LinkedList<ControlledJob>)field.get(this);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void failAllJobs(Throwable t) {
+    try {
+      failAllJobs.invoke(this, t);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   *  The main loop for the thread.
+   *  The loop does the following:
+   *    Check the states of the running jobs
+   *    Update the states of waiting jobs
+   *    Submit the jobs in ready state
+   */
+  public void run() {
+    if (!initSuccesful) {
+      super.run();
+      return;
+    }
+    try {
+      setRunnerState(ThreadState.RUNNING);
+      while (true) {
+        while (getRunnerState() == ThreadState.SUSPENDED) {
+          try {
+            Thread.sleep(timeToSleep);
+          }
+          catch (Exception e) {
+            //TODO the thread was interrupted, do something!!!
+          }
+        }
+
+        synchronized(this) {
+          Iterator<ControlledJob> it = getJobs(jobsInProgress).iterator();
+          if (!it.hasNext()) {
+              stop();
+          }
+          while(it.hasNext()) {
+            ControlledJob j = it.next();
+
+            // TODO: Need to re-visit the following try...catch
+            // when Pig picks up a Hadoop release with MAPREDUCE-6762 applied
+            // as its dependency.
+            try {
+              log.debug("Checking state of job " + j);
+            } catch(NullPointerException npe) {
+              log.warn("Failed to get job name " +
+                "when checking state of job. " +
+                "Check if job status is null.", npe);
+            }
+
+            switch(checkState(j)) {
+            case SUCCESS:
+              getJobs(successfulJobs).add(j);
+              it.remove();
+              break;
+            case FAILED:
+            case DEPENDENT_FAILED:
+              getJobs(failedJobs).add(j);
+              it.remove();
+              break;
+            case READY:
+              submit(j);
+              break;
+            case RUNNING:
+            case WAITING:
+              //Do Nothing
+              break;
+            }
+          }
+        }
+
+        if (getRunnerState() != ThreadState.RUNNING &&
+            getRunnerState() != ThreadState.SUSPENDED) {
+          break;
+        }
+        try {
+          Thread.sleep(timeToSleep);
+        }
+        catch (Exception e) {
+          //TODO the thread was interrupted, do something!!!
+        }
+        if (getRunnerState() != ThreadState.RUNNING &&
+            getRunnerState() != ThreadState.SUSPENDED) {
+          break;
+        }
+      }
+    }catch(Throwable t) {
+      log.error("Error while trying to run jobs.",t);
+      //Mark all jobs as failed because we got something bad.
+      failAllJobs(t);
+    }
+    setRunnerState(ThreadState.STOPPED);
+  }
+
+
+}

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
 Wed Feb 22 09:43:41 2017
@@ -17,8 +17,6 @@
 package org.apache.pig.backend.hadoop.accumulo;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Collection;
@@ -303,24 +301,8 @@ public abstract class AbstractAccumuloSt
      */
     protected void simpleUnset(Configuration conf,
             Map<String, String> entriesToUnset) {
-        try {
-            Method unset = conf.getClass().getMethod("unset", String.class);
-
-            for (String key : entriesToUnset.keySet()) {
-                unset.invoke(conf, key);
-            }
-        } catch (NoSuchMethodException e) {
-            log.error("Could not invoke Configuration.unset method", e);
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            log.error("Could not invoke Configuration.unset method", e);
-            throw new RuntimeException(e);
-        } catch (IllegalArgumentException e) {
-            log.error("Could not invoke Configuration.unset method", e);
-            throw new RuntimeException(e);
-        } catch (InvocationTargetException e) {
-            log.error("Could not invoke Configuration.unset method", e);
-            throw new RuntimeException(e);
+        for (String key : entriesToUnset.keySet()) {
+            conf.unset(key);
         }
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java 
Wed Feb 22 09:43:41 2017
@@ -22,8 +22,6 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.text.MessageFormat;
@@ -42,6 +40,7 @@ import java.util.zip.ZipOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Logger;
 
@@ -112,7 +111,7 @@ public class Utils {
         // attempt to locate an existing jar for the class.
         String jar = findContainingJar(my_class, packagedClasses);
         if (null == jar || jar.isEmpty()) {
-            jar = getJar(my_class);
+            jar = JarFinder.getJar(my_class);
             updateMap(jar, packagedClasses);
         }
 
@@ -200,41 +199,6 @@ public class Utils {
     }
 
     /**
-     * Invoke 'getJar' on a JarFinder implementation. Useful for some job
-     * configuration contexts (HBASE-8140) and also for testing on MRv2. First
-     * check if we have HADOOP-9426. Lacking that, fall back to the backport.
-     * 
-     * @param my_class
-     *            the class to find.
-     * @return a jar file that contains the class, or null.
-     */
-    private static String getJar(Class<?> my_class) {
-        String ret = null;
-        String hadoopJarFinder = "org.apache.hadoop.util.JarFinder";
-        Class<?> jarFinder = null;
-        try {
-            log.debug("Looking for " + hadoopJarFinder + ".");
-            jarFinder = Class.forName(hadoopJarFinder);
-            log.debug(hadoopJarFinder + " found.");
-            Method getJar = jarFinder.getMethod("getJar", Class.class);
-            ret = (String) getJar.invoke(null, my_class);
-        } catch (ClassNotFoundException e) {
-            log.debug("Using backported JarFinder.");
-            ret = jarFinderGetJar(my_class);
-        } catch (InvocationTargetException e) {
-            // function was properly called, but threw it's own exception.
-            // Unwrap it
-            // and pass it on.
-            throw new RuntimeException(e.getCause());
-        } catch (Exception e) {
-            // toss all other exceptions, related to reflection failure
-            throw new RuntimeException("getJar invocation failed.", e);
-        }
-
-        return ret;
-    }
-
-    /**
      * Returns the full path to the Jar containing the class. It always return 
a
      * JAR.
      * 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
 Wed Feb 22 09:43:41 2017
@@ -29,7 +29,6 @@ import org.apache.pig.ExecType;
 import org.apache.pig.PigConstants;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 
 public class ConfigurationUtil {
@@ -89,7 +88,7 @@ public class ConfigurationUtil {
             // so build/classes/hadoop-site.xml contains such entry. This 
prevents some tests from
             // successful (They expect those files in hdfs), so we need to 
unset it in hadoop 23.
             // This should go away once MiniMRCluster fix the distributed 
cache issue.
-            HadoopShims.unsetConf(localConf, MRConfiguration.JOB_CACHE_FILES);
+            localConf.unset(MRConfiguration.JOB_CACHE_FILES);
         }
         localConf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
         Properties props = ConfigurationUtil.toProperties(localConf);
@@ -106,4 +105,14 @@ public class ConfigurationUtil {
             }
         }
     }
+
+    /**
+     * Returns Properties containing alternative names of given property and 
same values - can be used to solve deprecations
+     * @return
+     */
+    public static Properties expandForAlternativeNames(String name, String 
value){
+        final Configuration config = new Configuration(false);
+        config.set(name,value);
+        return ConfigurationUtil.toProperties(config);
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
 Wed Feb 22 09:43:41 2017
@@ -18,20 +18,20 @@
 
 package org.apache.pig.backend.hadoop.datastorage;
 
-import java.net.URI;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
 import java.util.Enumeration;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
@@ -40,8 +40,6 @@ import org.apache.pig.backend.datastorag
 
 public class HDataStorage implements DataStorage {
 
-    private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
-
     private FileSystem fs;
     private Configuration configuration;
     private Properties properties;
@@ -58,9 +56,10 @@ public class HDataStorage implements Dat
         init();
     }
 
+    @Override
     public void init() {
         // check if name node is set, if not we set local as fail back
-        String nameNode = this.properties.getProperty(FILE_SYSTEM_LOCATION);
+        String nameNode = 
this.properties.getProperty(FileSystem.FS_DEFAULT_NAME_KEY);
         if (nameNode == null || nameNode.length() == 0) {
             nameNode = "local";
         }
@@ -76,14 +75,17 @@ public class HDataStorage implements Dat
         }
     }
 
+    @Override
     public void close() throws IOException {
         fs.close();
     }
-    
+
+    @Override
     public Properties getConfiguration() {
         return this.properties;
     }
 
+    @Override
     public void updateConfiguration(Properties newConfiguration)
             throws DataStorageException {
         // TODO sgroschupf 25Feb2008 this method is never called and
@@ -92,38 +94,40 @@ public class HDataStorage implements Dat
         if (newConfiguration == null) {
             return;
         }
-        
+
         Enumeration<Object> newKeys = newConfiguration.keys();
-        
+
         while (newKeys.hasMoreElements()) {
             String key = (String) newKeys.nextElement();
             String value = null;
-            
+
             value = newConfiguration.getProperty(key);
-            
+
             fs.getConf().set(key,value);
         }
     }
-    
+
+    @Override
     public Map<String, Object> getStatistics() throws IOException {
         Map<String, Object> stats = new HashMap<String, Object>();
 
         long usedBytes = fs.getUsed();
         stats.put(USED_BYTES_KEY , Long.valueOf(usedBytes).toString());
-        
+
         if (fs instanceof DistributedFileSystem) {
             DistributedFileSystem dfs = (DistributedFileSystem) fs;
-            
+
             long rawCapacityBytes = dfs.getRawCapacity();
             stats.put(RAW_CAPACITY_KEY, 
Long.valueOf(rawCapacityBytes).toString());
-            
+
             long rawUsedBytes = dfs.getRawUsed();
             stats.put(RAW_USED_KEY, Long.valueOf(rawUsedBytes).toString());
         }
-        
+
         return stats;
     }
-    
+
+    @Override
     public ElementDescriptor asElement(String name) throws 
DataStorageException {
         if (this.isContainer(name)) {
             return new HDirectory(this, name);
@@ -132,70 +136,82 @@ public class HDataStorage implements Dat
             return new HFile(this, name);
         }
     }
-    
+
+    @Override
     public ElementDescriptor asElement(ElementDescriptor element)
             throws DataStorageException {
         return asElement(element.toString());
     }
-    
+
+    @Override
     public ElementDescriptor asElement(String parent,
-                                                  String child) 
+                                                  String child)
             throws DataStorageException {
         return asElement((new Path(parent, child)).toString());
     }
 
+    @Override
     public ElementDescriptor asElement(ContainerDescriptor parent,
-                                                  String child) 
+                                                  String child)
             throws DataStorageException {
         return asElement(parent.toString(), child);
     }
 
+    @Override
     public ElementDescriptor asElement(ContainerDescriptor parent,
-                                                  ElementDescriptor child) 
+                                                  ElementDescriptor child)
             throws DataStorageException {
         return asElement(parent.toString(), child.toString());
     }
 
-    public ContainerDescriptor asContainer(String name) 
+    @Override
+    public ContainerDescriptor asContainer(String name)
             throws DataStorageException {
         return new HDirectory(this, name);
     }
-    
+
+    @Override
     public ContainerDescriptor asContainer(ContainerDescriptor container)
             throws DataStorageException {
         return new HDirectory(this, container.toString());
     }
-    
+
+    @Override
     public ContainerDescriptor asContainer(String parent,
-                                                      String child) 
+                                                      String child)
             throws DataStorageException {
         return new HDirectory(this, parent, child);
     }
 
+    @Override
     public ContainerDescriptor asContainer(ContainerDescriptor parent,
-                                                      String child) 
+                                                      String child)
             throws DataStorageException {
         return new HDirectory(this, parent.toString(), child);
     }
-    
+
+    @Override
     public ContainerDescriptor asContainer(ContainerDescriptor parent,
                                                       ContainerDescriptor 
child)
             throws DataStorageException {
         return new HDirectory(this, parent.toString(), child.toString());
     }
-    
+
+    @Override
     public void setActiveContainer(ContainerDescriptor container) {
         fs.setWorkingDirectory(new Path(container.toString()));
     }
-    
+
+    @Override
     public ContainerDescriptor getActiveContainer() {
         return new HDirectory(this, fs.getWorkingDirectory());
     }
 
+    @Override
     public boolean isContainer(String name) throws DataStorageException {
         boolean isContainer = false;
         Path path = new Path(name);
-        
+
         try {
             if ((this.fs.exists(path)) && (! this.fs.isFile(path))) {
                 isContainer = true;
@@ -206,10 +222,11 @@ public class HDataStorage implements Dat
             String msg = "Unable to check name " + name;
             throw new DataStorageException(msg, errCode, 
PigException.REMOTE_ENVIRONMENT, e);
         }
-        
+
         return isContainer;
     }
-    
+
+    @Override
     public HPath[] asCollection(String pattern) throws DataStorageException {
         try {
             FileStatus[] paths = this.fs.globStatus(new Path(pattern));
@@ -218,7 +235,7 @@ public class HDataStorage implements Dat
                 return new HPath[0];
 
             List<HPath> hpaths = new ArrayList<HPath>();
-            
+
             for (int i = 0; i < paths.length; ++i) {
                 HPath hpath = 
(HPath)this.asElement(paths[i].getPath().toString());
                 if (!hpath.systemElement()) {
@@ -233,7 +250,7 @@ public class HDataStorage implements Dat
             throw new DataStorageException(msg, errCode, 
PigException.REMOTE_ENVIRONMENT, e);
         }
     }
-    
+
     public FileSystem getHFS() {
         return fs;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 Wed Feb 22 09:43:41 2017
@@ -30,6 +30,7 @@ import java.util.Properties;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.PigException;
@@ -76,8 +77,6 @@ public abstract class HExecutionEngine i
     public static final String MAPRED_DEFAULT_SITE = "mapred-default.xml";
     public static final String YARN_DEFAULT_SITE = "yarn-default.xml";
 
-    public static final String FILE_SYSTEM_LOCATION = "fs.default.name";
-    public static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = 
"fs.defaultFS";
     public static final String LOCAL = "local";
 
     protected PigContext pigContext;
@@ -203,8 +202,8 @@ public abstract class HExecutionEngine i
                 properties.setProperty(MRConfiguration.FRAMEWORK_NAME, LOCAL);
             }
             properties.setProperty(MRConfiguration.JOB_TRACKER, LOCAL);
-            properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
-            properties.setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, 
"file:///");
+            properties.remove("fs.default.name"); //Deprecated in Hadoop 2.x
+            properties.setProperty(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
 
             jc = getLocalConf();
             JobConf s3Jc = getS3Conf();
@@ -220,24 +219,7 @@ public abstract class HExecutionEngine i
         HKerberos.tryKerberosKeytabLogin(jc);
 
         cluster = jc.get(MRConfiguration.JOB_TRACKER);
-        nameNode = jc.get(FILE_SYSTEM_LOCATION);
-        if (nameNode == null) {
-            nameNode = (String) 
pigContext.getProperties().get(ALTERNATIVE_FILE_SYSTEM_LOCATION);
-        }
-
-        if (cluster != null && cluster.length() > 0) {
-            if (!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
-                cluster = cluster + ":50020";
-            }
-            properties.setProperty(MRConfiguration.JOB_TRACKER, cluster);
-        }
-
-        if (nameNode != null && nameNode.length() > 0) {
-            if (!nameNode.contains(":") && !nameNode.equalsIgnoreCase(LOCAL)) {
-                nameNode = nameNode + ":8020";
-            }
-            properties.setProperty(FILE_SYSTEM_LOCATION, nameNode);
-        }
+        nameNode = jc.get(FileSystem.FS_DEFAULT_NAME_KEY);
 
         LOG.info("Connecting to hadoop file system at: "
                 + (nameNode == null ? LOCAL : nameNode));
@@ -369,7 +351,11 @@ public abstract class HExecutionEngine i
     @Override
     public void setProperty(String property, String value) {
         Properties properties = pigContext.getProperties();
-        properties.put(property, value);
+        if (Configuration.isDeprecated(property)) {
+            
properties.putAll(ConfigurationUtil.expandForAlternativeNames(property, value));
+        } else {
+            properties.put(property, value);
+        }
     }
 
     @Override
@@ -378,6 +364,13 @@ public abstract class HExecutionEngine i
     }
 
     @Override
+    public void kill() throws BackendException {
+        if (launcher != null) {
+            launcher.kill();
+        }
+    }
+
+    @Override
     public void killJob(String jobID) throws BackendException {
         if (launcher != null) {
             launcher.killJob(jobID, getJobConf());

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java 
Wed Feb 22 09:43:41 2017
@@ -40,7 +40,7 @@ import org.apache.pig.tools.pigstats.Pig
 public class HJob implements ExecJob {
 
     private final Log log = LogFactory.getLog(getClass());
-    
+
     protected JOB_STATUS status;
     protected PigContext pigContext;
     protected FileSpec outFileSpec;
@@ -48,7 +48,7 @@ public class HJob implements ExecJob {
     protected String alias;
     protected POStore poStore;
     private PigStats stats;
-    
+
     public HJob(JOB_STATUS status,
                 PigContext pigContext,
                 POStore store,
@@ -59,7 +59,7 @@ public class HJob implements ExecJob {
         this.outFileSpec = poStore.getSFile();
         this.alias = alias;
     }
-    
+
     public HJob(JOB_STATUS status,
             PigContext pigContext,
             POStore store,
@@ -72,37 +72,41 @@ public class HJob implements ExecJob {
         this.alias = alias;
         this.stats = stats;
     }
-    
+
+    @Override
     public JOB_STATUS getStatus() {
         return status;
     }
-    
+
+    @Override
     public boolean hasCompleted() throws ExecException {
         return true;
     }
-    
+
+    @Override
     public Iterator<Tuple> getResults() throws ExecException {
         final LoadFunc p;
-        
+
         try{
-             LoadFunc originalLoadFunc = 
+             LoadFunc originalLoadFunc =
                  (LoadFunc)PigContext.instantiateFuncFromSpec(
                          outFileSpec.getFuncSpec());
-             
-             p = (LoadFunc) new ReadToEndLoader(originalLoadFunc, 
+
+             p = (LoadFunc) new ReadToEndLoader(originalLoadFunc,
                      ConfigurationUtil.toConfiguration(
-                     pigContext.getProperties()), outFileSpec.getFileName(), 
0, pigContext);
+                     pigContext.getProperties()), outFileSpec.getFileName(), 
0);
 
         }catch (Exception e){
             int errCode = 2088;
             String msg = "Unable to get results for: " + outFileSpec;
             throw new ExecException(msg, errCode, PigException.BUG, e);
         }
-        
+
         return new Iterator<Tuple>() {
             Tuple   t;
             boolean atEnd;
 
+            @Override
             public boolean hasNext() {
                 if (atEnd)
                     return false;
@@ -120,6 +124,7 @@ public class HJob implements ExecJob {
                 return !atEnd;
             }
 
+            @Override
             public Tuple next() {
                 Tuple next = t;
                 if (next != null) {
@@ -136,6 +141,7 @@ public class HJob implements ExecJob {
                 return next;
             }
 
+            @Override
             public void remove() {
                 throw new RuntimeException("Removal not supported");
             }
@@ -143,31 +149,38 @@ public class HJob implements ExecJob {
         };
     }
 
+    @Override
     public Properties getConfiguration() {
         return pigContext.getProperties();
     }
 
+    @Override
     public PigStats getStatistics() {
         //throw new UnsupportedOperationException();
         return stats;
     }
 
+    @Override
     public void completionNotification(Object cookie) {
         throw new UnsupportedOperationException();
     }
-    
+
+    @Override
     public void kill() throws ExecException {
         throw new UnsupportedOperationException();
     }
-    
+
+    @Override
     public void getLogs(OutputStream log) throws ExecException {
         throw new UnsupportedOperationException();
     }
-    
+
+    @Override
     public void getSTDOut(OutputStream out) throws ExecException {
         throw new UnsupportedOperationException();
     }
-    
+
+    @Override
     public void getSTDError(OutputStream error) throws ExecException {
         throw new UnsupportedOperationException();
     }
@@ -176,6 +189,7 @@ public class HJob implements ExecJob {
         backendException = e;
     }
 
+    @Override
     public Exception getException() {
         return backendException;
     }


Reply via email to