Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java 
(added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java Fri 
Feb 24 08:19:42 2017
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.ScriptState;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class PigATSClient {
+    public static class ATSEvent {
+        public ATSEvent(String pigAuditId, String callerId) {
+            this.pigScriptId = pigAuditId;
+            this.callerId = callerId;
+        }
+        String callerId;
+        String pigScriptId;
+    }
+    public static final String ENTITY_TYPE = "PIG_SCRIPT_ID";
+    public static final String ENTITY_CALLERID = "callerId";
+    public static final String CALLER_CONTEXT = "PIG";
+    public static final int AUDIT_ID_MAX_LENGTH = 128;
+
+    private static final Log log = 
LogFactory.getLog(PigATSClient.class.getName());
+    private static PigATSClient instance;
+    private static ExecutorService executor;
+    private TimelineClient timelineClient;
+
+    public static synchronized PigATSClient getInstance() {
+        if (instance==null) {
+            instance = new PigATSClient();
+        }
+        return instance;
+    }
+
+    private PigATSClient() {
+        if (executor == null) {
+            executor = Executors.newSingleThreadExecutor(
+                    new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build());
+            YarnConfiguration yarnConf = new YarnConfiguration();
+            timelineClient = TimelineClient.createTimelineClient();
+            timelineClient.init(yarnConf);
+            timelineClient.start();
+        }
+        Utils.addShutdownHookWithPriority(new Runnable() {
+            @Override
+            public void run() {
+                timelineClient.stop();
+                executor.shutdownNow();
+                executor = null;
+            }
+        }, PigImplConstants.SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY);
+        log.info("Created ATS Hook");
+    }
+
+    public static String getPigAuditId(PigContext context) {
+        String auditId;
+        if (context.getProperties().get(PigImplConstants.PIG_AUDIT_ID) != 
null) {
+            auditId = 
(String)context.getProperties().get(PigImplConstants.PIG_AUDIT_ID);
+        } else {
+            ScriptState ss = ScriptState.get();
+            String filename = ss.getFileName().isEmpty()?"default" : new 
File(ss.getFileName()).getName();
+            auditId = CALLER_CONTEXT + "-" + filename + "-" + ss.getId();
+        }
+        return auditId.substring(0, Math.min(auditId.length(), 
AUDIT_ID_MAX_LENGTH));
+    }
+
+    synchronized public void logEvent(final ATSEvent event) {
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                TimelineEntity entity = new TimelineEntity();
+                entity.setEntityId(event.pigScriptId);
+                entity.setEntityType(ENTITY_TYPE);
+                entity.addPrimaryFilter(ENTITY_CALLERID, 
event.callerId!=null?event.callerId : "default");
+                try {
+                    timelineClient.putEntities(entity);
+                } catch (Exception e) {
+                    log.info("Failed to submit plan to ATS: " + 
e.getMessage());
+                }
+            }
+        });
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java 
(added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java Fri 
Feb 24 08:19:42 2017
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
+import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State;
+
+/**
+ * extends the hadoop JobControl to remove the hardcoded sleep(5000)
+ * as most of this is private we have to use reflection
+ *
+ * See {@link 
https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java
 }
+ *
+ */
+public class PigJobControl extends JobControl {
+  private static final Log log = LogFactory.getLog(PigJobControl.class);
+
+  private static Field runnerState;
+  private static Field jobsInProgress;
+  private static Field successfulJobs;
+  private static Field failedJobs;
+
+  private static Method failAllJobs;
+
+  private static Method checkState;
+  private static Method submit;
+
+  private static boolean initSuccesful;
+
+  static {
+    try {
+
+      runnerState = 
org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("runnerState");
+      runnerState.setAccessible(true);
+      jobsInProgress = 
org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("jobsInProgress");
+      jobsInProgress.setAccessible(true);
+      successfulJobs = 
org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("successfulJobs");
+      successfulJobs.setAccessible(true);
+      failedJobs = 
org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("failedJobs");
+      failedJobs.setAccessible(true);
+
+      failAllJobs = 
org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredMethod("failAllJobs",
 Throwable.class);
+      failAllJobs.setAccessible(true);
+
+      checkState = ControlledJob.class.getDeclaredMethod("checkState");
+      checkState.setAccessible(true);
+      submit = ControlledJob.class.getDeclaredMethod("submit");
+      submit.setAccessible(true);
+
+      initSuccesful = true;
+    } catch (Exception e) {
+      log.debug("falling back to default JobControl (not using hadoop 0.23 
?)", e);
+      initSuccesful = false;
+    }
+  }
+
+  protected int timeToSleep;
+
+  /**
+   * Construct a job control for a group of jobs.
+   * @param groupName a name identifying this group
+   * @param pigContext
+   * @param conf
+   */
+  public PigJobControl(String groupName, int timeToSleep) {
+    super(groupName);
+    this.timeToSleep = timeToSleep;
+  }
+
+  public int getTimeToSleep() {
+    return timeToSleep;
+  }
+
+  public void setTimeToSleep(int timeToSleep) {
+    this.timeToSleep = timeToSleep;
+  }
+
+  private void setRunnerState(ThreadState state) {
+    try {
+      runnerState.set(this, state);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  private ThreadState getRunnerState() {
+    try {
+      return (ThreadState)runnerState.get(this);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private State checkState(ControlledJob j) {
+    try {
+      return (State)checkState.invoke(j);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private State submit(ControlledJob j) {
+    try {
+      return (State)submit.invoke(j);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private LinkedList<ControlledJob> getJobs(Field field) {
+    try {
+      return (LinkedList<ControlledJob>)field.get(this);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void failAllJobs(Throwable t) {
+    try {
+      failAllJobs.invoke(this, t);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   *  The main loop for the thread.
+   *  The loop does the following:
+   *    Check the states of the running jobs
+   *    Update the states of waiting jobs
+   *    Submit the jobs in ready state
+   */
+  public void run() {
+    if (!initSuccesful) {
+      super.run();
+      return;
+    }
+    try {
+      setRunnerState(ThreadState.RUNNING);
+      while (true) {
+        while (getRunnerState() == ThreadState.SUSPENDED) {
+          try {
+            Thread.sleep(timeToSleep);
+          }
+          catch (Exception e) {
+            //TODO the thread was interrupted, do something!!!
+          }
+        }
+
+        synchronized(this) {
+          Iterator<ControlledJob> it = getJobs(jobsInProgress).iterator();
+          if (!it.hasNext()) {
+              stop();
+          }
+          while(it.hasNext()) {
+            ControlledJob j = it.next();
+
+            // TODO: Need to re-visit the following try...catch
+            // when Pig picks up a Hadoop release with MAPREDUCE-6762 applied
+            // as its dependency.
+            try {
+              log.debug("Checking state of job " + j);
+            } catch(NullPointerException npe) {
+              log.warn("Failed to get job name " +
+                "when checking state of job. " +
+                "Check if job status is null.", npe);
+            }
+
+            switch(checkState(j)) {
+            case SUCCESS:
+              getJobs(successfulJobs).add(j);
+              it.remove();
+              break;
+            case FAILED:
+            case DEPENDENT_FAILED:
+              getJobs(failedJobs).add(j);
+              it.remove();
+              break;
+            case READY:
+              submit(j);
+              break;
+            case RUNNING:
+            case WAITING:
+              //Do Nothing
+              break;
+            }
+          }
+        }
+
+        if (getRunnerState() != ThreadState.RUNNING &&
+            getRunnerState() != ThreadState.SUSPENDED) {
+          break;
+        }
+        try {
+          Thread.sleep(timeToSleep);
+        }
+        catch (Exception e) {
+          //TODO the thread was interrupted, do something!!!
+        }
+        if (getRunnerState() != ThreadState.RUNNING &&
+            getRunnerState() != ThreadState.SUSPENDED) {
+          break;
+        }
+      }
+    }catch(Throwable t) {
+      log.error("Error while trying to run jobs.",t);
+      //Mark all jobs as failed because we got something bad.
+      failAllJobs(t);
+    }
+    setRunnerState(ThreadState.STOPPED);
+  }
+
+
+}

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
 Fri Feb 24 08:19:42 2017
@@ -17,8 +17,6 @@
 package org.apache.pig.backend.hadoop.accumulo;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.Collection;
@@ -303,24 +301,8 @@ public abstract class AbstractAccumuloSt
      */
     protected void simpleUnset(Configuration conf,
             Map<String, String> entriesToUnset) {
-        try {
-            Method unset = conf.getClass().getMethod("unset", String.class);
-
-            for (String key : entriesToUnset.keySet()) {
-                unset.invoke(conf, key);
-            }
-        } catch (NoSuchMethodException e) {
-            log.error("Could not invoke Configuration.unset method", e);
-            throw new RuntimeException(e);
-        } catch (IllegalAccessException e) {
-            log.error("Could not invoke Configuration.unset method", e);
-            throw new RuntimeException(e);
-        } catch (IllegalArgumentException e) {
-            log.error("Could not invoke Configuration.unset method", e);
-            throw new RuntimeException(e);
-        } catch (InvocationTargetException e) {
-            log.error("Could not invoke Configuration.unset method", e);
-            throw new RuntimeException(e);
+        for (String key : entriesToUnset.keySet()) {
+            conf.unset(key);
         }
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java 
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java 
Fri Feb 24 08:19:42 2017
@@ -22,8 +22,6 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.text.MessageFormat;
@@ -42,6 +40,7 @@ import java.util.zip.ZipOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Logger;
 
@@ -112,7 +111,7 @@ public class Utils {
         // attempt to locate an existing jar for the class.
         String jar = findContainingJar(my_class, packagedClasses);
         if (null == jar || jar.isEmpty()) {
-            jar = getJar(my_class);
+            jar = JarFinder.getJar(my_class);
             updateMap(jar, packagedClasses);
         }
 
@@ -200,41 +199,6 @@ public class Utils {
     }
 
     /**
-     * Invoke 'getJar' on a JarFinder implementation. Useful for some job
-     * configuration contexts (HBASE-8140) and also for testing on MRv2. First
-     * check if we have HADOOP-9426. Lacking that, fall back to the backport.
-     * 
-     * @param my_class
-     *            the class to find.
-     * @return a jar file that contains the class, or null.
-     */
-    private static String getJar(Class<?> my_class) {
-        String ret = null;
-        String hadoopJarFinder = "org.apache.hadoop.util.JarFinder";
-        Class<?> jarFinder = null;
-        try {
-            log.debug("Looking for " + hadoopJarFinder + ".");
-            jarFinder = Class.forName(hadoopJarFinder);
-            log.debug(hadoopJarFinder + " found.");
-            Method getJar = jarFinder.getMethod("getJar", Class.class);
-            ret = (String) getJar.invoke(null, my_class);
-        } catch (ClassNotFoundException e) {
-            log.debug("Using backported JarFinder.");
-            ret = jarFinderGetJar(my_class);
-        } catch (InvocationTargetException e) {
-            // function was properly called, but threw it's own exception.
-            // Unwrap it
-            // and pass it on.
-            throw new RuntimeException(e.getCause());
-        } catch (Exception e) {
-            // toss all other exceptions, related to reflection failure
-            throw new RuntimeException("getJar invocation failed.", e);
-        }
-
-        return ret;
-    }
-
-    /**
      * Returns the full path to the Jar containing the class. It always return 
a
      * JAR.
      * 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
 Fri Feb 24 08:19:42 2017
@@ -29,7 +29,6 @@ import org.apache.pig.ExecType;
 import org.apache.pig.PigConstants;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 
 public class ConfigurationUtil {
@@ -89,7 +88,7 @@ public class ConfigurationUtil {
             // so build/classes/hadoop-site.xml contains such entry. This 
prevents some tests from
             // successful (They expect those files in hdfs), so we need to 
unset it in hadoop 23.
             // This should go away once MiniMRCluster fix the distributed 
cache issue.
-            HadoopShims.unsetConf(localConf, MRConfiguration.JOB_CACHE_FILES);
+            localConf.unset(MRConfiguration.JOB_CACHE_FILES);
         }
         localConf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
         Properties props = ConfigurationUtil.toProperties(localConf);
@@ -106,4 +105,14 @@ public class ConfigurationUtil {
             }
         }
     }
+
+    /**
+     * Returns Properties containing alternative names of given property and 
same values - can be used to solve deprecations
+     * @return
+     */
+    public static Properties expandForAlternativeNames(String name, String 
value){
+        final Configuration config = new Configuration(false);
+        config.set(name,value);
+        return ConfigurationUtil.toProperties(config);
+    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
 Fri Feb 24 08:19:42 2017
@@ -18,20 +18,20 @@
 
 package org.apache.pig.backend.hadoop.datastorage;
 
-import java.net.URI;
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
 import java.util.Enumeration;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
@@ -40,8 +40,6 @@ import org.apache.pig.backend.datastorag
 
 public class HDataStorage implements DataStorage {
 
-    private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
-
     private FileSystem fs;
     private Configuration configuration;
     private Properties properties;
@@ -58,9 +56,10 @@ public class HDataStorage implements Dat
         init();
     }
 
+    @Override
     public void init() {
         // check if name node is set, if not we set local as fail back
-        String nameNode = this.properties.getProperty(FILE_SYSTEM_LOCATION);
+        String nameNode = 
this.properties.getProperty(FileSystem.FS_DEFAULT_NAME_KEY);
         if (nameNode == null || nameNode.length() == 0) {
             nameNode = "local";
         }
@@ -76,14 +75,17 @@ public class HDataStorage implements Dat
         }
     }
 
+    @Override
     public void close() throws IOException {
         fs.close();
     }
-    
+
+    @Override
     public Properties getConfiguration() {
         return this.properties;
     }
 
+    @Override
     public void updateConfiguration(Properties newConfiguration)
             throws DataStorageException {
         // TODO sgroschupf 25Feb2008 this method is never called and
@@ -92,38 +94,40 @@ public class HDataStorage implements Dat
         if (newConfiguration == null) {
             return;
         }
-        
+
         Enumeration<Object> newKeys = newConfiguration.keys();
-        
+
         while (newKeys.hasMoreElements()) {
             String key = (String) newKeys.nextElement();
             String value = null;
-            
+
             value = newConfiguration.getProperty(key);
-            
+
             fs.getConf().set(key,value);
         }
     }
-    
+
+    @Override
     public Map<String, Object> getStatistics() throws IOException {
         Map<String, Object> stats = new HashMap<String, Object>();
 
         long usedBytes = fs.getUsed();
         stats.put(USED_BYTES_KEY , Long.valueOf(usedBytes).toString());
-        
+
         if (fs instanceof DistributedFileSystem) {
             DistributedFileSystem dfs = (DistributedFileSystem) fs;
-            
+
             long rawCapacityBytes = dfs.getRawCapacity();
             stats.put(RAW_CAPACITY_KEY, 
Long.valueOf(rawCapacityBytes).toString());
-            
+
             long rawUsedBytes = dfs.getRawUsed();
             stats.put(RAW_USED_KEY, Long.valueOf(rawUsedBytes).toString());
         }
-        
+
         return stats;
     }
-    
+
+    @Override
     public ElementDescriptor asElement(String name) throws 
DataStorageException {
         if (this.isContainer(name)) {
             return new HDirectory(this, name);
@@ -132,70 +136,82 @@ public class HDataStorage implements Dat
             return new HFile(this, name);
         }
     }
-    
+
+    @Override
     public ElementDescriptor asElement(ElementDescriptor element)
             throws DataStorageException {
         return asElement(element.toString());
     }
-    
+
+    @Override
     public ElementDescriptor asElement(String parent,
-                                                  String child) 
+                                                  String child)
             throws DataStorageException {
         return asElement((new Path(parent, child)).toString());
     }
 
+    @Override
     public ElementDescriptor asElement(ContainerDescriptor parent,
-                                                  String child) 
+                                                  String child)
             throws DataStorageException {
         return asElement(parent.toString(), child);
     }
 
+    @Override
     public ElementDescriptor asElement(ContainerDescriptor parent,
-                                                  ElementDescriptor child) 
+                                                  ElementDescriptor child)
             throws DataStorageException {
         return asElement(parent.toString(), child.toString());
     }
 
-    public ContainerDescriptor asContainer(String name) 
+    @Override
+    public ContainerDescriptor asContainer(String name)
             throws DataStorageException {
         return new HDirectory(this, name);
     }
-    
+
+    @Override
     public ContainerDescriptor asContainer(ContainerDescriptor container)
             throws DataStorageException {
         return new HDirectory(this, container.toString());
     }
-    
+
+    @Override
     public ContainerDescriptor asContainer(String parent,
-                                                      String child) 
+                                                      String child)
             throws DataStorageException {
         return new HDirectory(this, parent, child);
     }
 
+    @Override
     public ContainerDescriptor asContainer(ContainerDescriptor parent,
-                                                      String child) 
+                                                      String child)
             throws DataStorageException {
         return new HDirectory(this, parent.toString(), child);
     }
-    
+
+    @Override
     public ContainerDescriptor asContainer(ContainerDescriptor parent,
                                                       ContainerDescriptor 
child)
             throws DataStorageException {
         return new HDirectory(this, parent.toString(), child.toString());
     }
-    
+
+    @Override
     public void setActiveContainer(ContainerDescriptor container) {
         fs.setWorkingDirectory(new Path(container.toString()));
     }
-    
+
+    @Override
     public ContainerDescriptor getActiveContainer() {
         return new HDirectory(this, fs.getWorkingDirectory());
     }
 
+    @Override
     public boolean isContainer(String name) throws DataStorageException {
         boolean isContainer = false;
         Path path = new Path(name);
-        
+
         try {
             if ((this.fs.exists(path)) && (! this.fs.isFile(path))) {
                 isContainer = true;
@@ -206,10 +222,11 @@ public class HDataStorage implements Dat
             String msg = "Unable to check name " + name;
             throw new DataStorageException(msg, errCode, 
PigException.REMOTE_ENVIRONMENT, e);
         }
-        
+
         return isContainer;
     }
-    
+
+    @Override
     public HPath[] asCollection(String pattern) throws DataStorageException {
         try {
             FileStatus[] paths = this.fs.globStatus(new Path(pattern));
@@ -218,7 +235,7 @@ public class HDataStorage implements Dat
                 return new HPath[0];
 
             List<HPath> hpaths = new ArrayList<HPath>();
-            
+
             for (int i = 0; i < paths.length; ++i) {
                 HPath hpath = 
(HPath)this.asElement(paths[i].getPath().toString());
                 if (!hpath.systemElement()) {
@@ -233,7 +250,7 @@ public class HDataStorage implements Dat
             throw new DataStorageException(msg, errCode, 
PigException.REMOTE_ENVIRONMENT, e);
         }
     }
-    
+
     public FileSystem getHFS() {
         return fs;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
 Fri Feb 24 08:19:42 2017
@@ -30,6 +30,7 @@ import java.util.Properties;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.PigException;
@@ -76,8 +77,6 @@ public abstract class HExecutionEngine i
     public static final String MAPRED_DEFAULT_SITE = "mapred-default.xml";
     public static final String YARN_DEFAULT_SITE = "yarn-default.xml";
 
-    public static final String FILE_SYSTEM_LOCATION = "fs.default.name";
-    public static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = 
"fs.defaultFS";
     public static final String LOCAL = "local";
 
     protected PigContext pigContext;
@@ -203,8 +202,8 @@ public abstract class HExecutionEngine i
                 properties.setProperty(MRConfiguration.FRAMEWORK_NAME, LOCAL);
             }
             properties.setProperty(MRConfiguration.JOB_TRACKER, LOCAL);
-            properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
-            properties.setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, 
"file:///");
+            properties.remove("fs.default.name"); //Deprecated in Hadoop 2.x
+            properties.setProperty(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
 
             jc = getLocalConf();
             JobConf s3Jc = getS3Conf();
@@ -220,24 +219,7 @@ public abstract class HExecutionEngine i
         HKerberos.tryKerberosKeytabLogin(jc);
 
         cluster = jc.get(MRConfiguration.JOB_TRACKER);
-        nameNode = jc.get(FILE_SYSTEM_LOCATION);
-        if (nameNode == null) {
-            nameNode = (String) 
pigContext.getProperties().get(ALTERNATIVE_FILE_SYSTEM_LOCATION);
-        }
-
-        if (cluster != null && cluster.length() > 0) {
-            if (!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
-                cluster = cluster + ":50020";
-            }
-            properties.setProperty(MRConfiguration.JOB_TRACKER, cluster);
-        }
-
-        if (nameNode != null && nameNode.length() > 0) {
-            if (!nameNode.contains(":") && !nameNode.equalsIgnoreCase(LOCAL)) {
-                nameNode = nameNode + ":8020";
-            }
-            properties.setProperty(FILE_SYSTEM_LOCATION, nameNode);
-        }
+        nameNode = jc.get(FileSystem.FS_DEFAULT_NAME_KEY);
 
         LOG.info("Connecting to hadoop file system at: "
                 + (nameNode == null ? LOCAL : nameNode));
@@ -369,7 +351,11 @@ public abstract class HExecutionEngine i
     @Override
     public void setProperty(String property, String value) {
         Properties properties = pigContext.getProperties();
-        properties.put(property, value);
+        if (Configuration.isDeprecated(property)) {
+            
properties.putAll(ConfigurationUtil.expandForAlternativeNames(property, value));
+        } else {
+            properties.put(property, value);
+        }
     }
 
     @Override
@@ -378,6 +364,13 @@ public abstract class HExecutionEngine i
     }
 
     @Override
+    public void kill() throws BackendException {
+        if (launcher != null) {
+            launcher.kill();
+        }
+    }
+
+    @Override
     public void killJob(String jobID) throws BackendException {
         if (launcher != null) {
             launcher.killJob(jobID, getJobConf());

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java 
Fri Feb 24 08:19:42 2017
@@ -40,7 +40,7 @@ import org.apache.pig.tools.pigstats.Pig
 public class HJob implements ExecJob {
 
     private final Log log = LogFactory.getLog(getClass());
-    
+
     protected JOB_STATUS status;
     protected PigContext pigContext;
     protected FileSpec outFileSpec;
@@ -48,7 +48,7 @@ public class HJob implements ExecJob {
     protected String alias;
     protected POStore poStore;
     private PigStats stats;
-    
+
     public HJob(JOB_STATUS status,
                 PigContext pigContext,
                 POStore store,
@@ -59,7 +59,7 @@ public class HJob implements ExecJob {
         this.outFileSpec = poStore.getSFile();
         this.alias = alias;
     }
-    
+
     public HJob(JOB_STATUS status,
             PigContext pigContext,
             POStore store,
@@ -72,37 +72,41 @@ public class HJob implements ExecJob {
         this.alias = alias;
         this.stats = stats;
     }
-    
+
+    @Override
     public JOB_STATUS getStatus() {
         return status;
     }
-    
+
+    @Override
     public boolean hasCompleted() throws ExecException {
         return true;
     }
-    
+
+    @Override
     public Iterator<Tuple> getResults() throws ExecException {
         final LoadFunc p;
-        
+
         try{
-             LoadFunc originalLoadFunc = 
+             LoadFunc originalLoadFunc =
                  (LoadFunc)PigContext.instantiateFuncFromSpec(
                          outFileSpec.getFuncSpec());
-             
-             p = (LoadFunc) new ReadToEndLoader(originalLoadFunc, 
+
+             p = (LoadFunc) new ReadToEndLoader(originalLoadFunc,
                      ConfigurationUtil.toConfiguration(
-                     pigContext.getProperties()), outFileSpec.getFileName(), 
0, pigContext);
+                     pigContext.getProperties()), outFileSpec.getFileName(), 
0);
 
         }catch (Exception e){
             int errCode = 2088;
             String msg = "Unable to get results for: " + outFileSpec;
             throw new ExecException(msg, errCode, PigException.BUG, e);
         }
-        
+
         return new Iterator<Tuple>() {
             Tuple   t;
             boolean atEnd;
 
+            @Override
             public boolean hasNext() {
                 if (atEnd)
                     return false;
@@ -120,6 +124,7 @@ public class HJob implements ExecJob {
                 return !atEnd;
             }
 
+            @Override
             public Tuple next() {
                 Tuple next = t;
                 if (next != null) {
@@ -136,6 +141,7 @@ public class HJob implements ExecJob {
                 return next;
             }
 
+            @Override
             public void remove() {
                 throw new RuntimeException("Removal not supported");
             }
@@ -143,31 +149,38 @@ public class HJob implements ExecJob {
         };
     }
 
+    @Override
     public Properties getConfiguration() {
         return pigContext.getProperties();
     }
 
+    @Override
     public PigStats getStatistics() {
         //throw new UnsupportedOperationException();
         return stats;
     }
 
+    @Override
     public void completionNotification(Object cookie) {
         throw new UnsupportedOperationException();
     }
-    
+
+    @Override
     public void kill() throws ExecException {
         throw new UnsupportedOperationException();
     }
-    
+
+    @Override
     public void getLogs(OutputStream log) throws ExecException {
         throw new UnsupportedOperationException();
     }
-    
+
+    @Override
     public void getSTDOut(OutputStream out) throws ExecException {
         throw new UnsupportedOperationException();
     }
-    
+
+    @Override
     public void getSTDError(OutputStream error) throws ExecException {
         throw new UnsupportedOperationException();
     }
@@ -176,6 +189,7 @@ public class HJob implements ExecJob {
         backendException = e;
     }
 
+    @Override
     public Exception getException() {
         return backendException;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
 Fri Feb 24 08:19:42 2017
@@ -32,7 +32,8 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.TIPStatus;
+import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.FuncSpec;
@@ -40,7 +41,6 @@ import org.apache.pig.PigException;
 import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.PlanException;
@@ -76,7 +76,7 @@ public abstract class Launcher {
     protected Map<FileSpec, Exception> failureMap;
     protected JobControl jc = null;
 
-    class HangingJobKiller extends Thread {
+    protected class HangingJobKiller extends Thread {
         public HangingJobKiller() {}
 
         @Override
@@ -90,7 +90,6 @@ public abstract class Launcher {
     }
 
     protected Launcher() {
-        Runtime.getRuntime().addShutdownHook(new HangingJobKiller());
         // handle the windows portion of \r
         if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) 
{
             newLine = "\r\n";
@@ -104,7 +103,6 @@ public abstract class Launcher {
     public void reset() {
         failureMap = Maps.newHashMap();
         totalHadoopTimeSpent = 0;
-        jc = null;
     }
 
     /**
@@ -179,7 +177,7 @@ public abstract class Launcher {
             String exceptionCreateFailMsg = null;
             boolean jobFailed = false;
             if (msgs.length > 0) {
-                if (HadoopShims.isJobFailed(report)) {
+                if (report.getCurrentStatus()== TIPStatus.FAILED) {
                     jobFailed = true;
                 }
                 Set<String> errorMessageSet = new HashSet<String>();
@@ -261,11 +259,30 @@ public abstract class Launcher {
 
         List<Job> runnJobs = jc.getRunningJobs();
         for (Job j : runnJobs) {
-            prog += HadoopShims.progressOfRunningJob(j);
+            prog += progressOfRunningJob(j);
         }
         return prog;
     }
 
+    /**
+     * Returns the progress of a Job j which is part of a submitted JobControl
+     * object. The progress is for this Job. So it has to be scaled down by the
+     * num of jobs that are present in the JobControl.
+     *
+     * @param j The Job for which progress is required
+     * @return Returns the percentage progress of this Job
+     * @throws IOException
+     */
+    private static double progressOfRunningJob(Job j)
+            throws IOException {
+        org.apache.hadoop.mapreduce.Job mrJob = j.getJob();
+        try {
+            return (mrJob.mapProgress() + mrJob.reduceProgress()) / 2;
+        } catch (Exception ir) {
+            return 0;
+        }
+    }
+
     public long getTotalHadoopTimeSpent() {
         return totalHadoopTimeSpent;
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
 Fri Feb 24 08:19:42 2017
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
@@ -122,7 +123,8 @@ public class FetchLauncher {
         poStore.setUp();
 
         TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID();
-        HadoopShims.setTaskAttemptId(conf, taskAttemptID);
+        //Fetch mode needs to explicitly set the task id which is otherwise 
done by Hadoop
+        conf.setInt(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, 
taskAttemptID.getId());
 
         if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
             MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java
 Fri Feb 24 08:19:42 2017
@@ -95,7 +95,7 @@ public class FetchPOStoreImpl extends PO
         }
         if (outputCommitter.needsTaskCommit(context))
             outputCommitter.commitTask(context);
-        HadoopShims.commitOrCleanup(outputCommitter, context);
+        outputCommitter.commitJob(context);
     }
 
     @Override
@@ -109,7 +109,7 @@ public class FetchPOStoreImpl extends PO
             }
             writer = null;
         }
-        HadoopShims.commitOrCleanup(outputCommitter, context);
+        outputCommitter.commitJob(context);
     }
 
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java
 Fri Feb 24 08:19:42 2017
@@ -22,43 +22,48 @@ import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Reducer;
-
-import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
 
 /**
  * A special implementation of combiner used only for distinct.  This combiner
  * does not even parse out the records.  It just throws away duplicate values
- * in the key in order ot minimize the data being sent to the reduce.
+ * in the key in order to minimize the data being sent to the reduce.
  */
 public class DistinctCombiner {
 
-    public static class Combine 
+    public static class Combine
         extends Reducer<PigNullableWritable, NullableTuple, 
PigNullableWritable, Writable> {
-        
+
         private final Log log = LogFactory.getLog(getClass());
 
-        ProgressableReporter pigReporter;
-        
-        /**
-         * Configures the reporter 
-         */
+        private static boolean firstTime = true;
+
+        //@StaticDataCleanup
+        public static void staticDataCleanup() {
+            firstTime = true;
+        }
+
         @Override
         protected void setup(Context context) throws IOException, 
InterruptedException {
             super.setup(context);
-            pigReporter = new ProgressableReporter();
+            Configuration jConf = context.getConfiguration();
+            // Avoid log spamming
+            if (firstTime) {
+                log.info("Aliases being processed per job phase 
(AliasName[line,offset]): " + jConf.get("pig.alias.location"));
+                firstTime = false;
+            }
         }
-        
+
         /**
          * The reduce function which removes values.
          */
         @Override
-        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> 
tupIter, Context context) 
+        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> 
tupIter, Context context)
                 throws IOException, InterruptedException {
-            
-            pigReporter.setRep(context);
 
             // Take the first value and the key and collect
             // just that.
@@ -66,6 +71,7 @@ public class DistinctCombiner {
             NullableTuple val = iter.next();
             context.write(key, val);
         }
+
     }
-    
+
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
 Fri Feb 24 08:19:42 2017
@@ -75,16 +75,24 @@ public class FileBasedOutputSizeReader i
             return -1;
         }
 
-        long bytes = 0;
         Path p = new Path(getLocationUri(sto));
-        FileSystem fs = p.getFileSystem(conf);
-        FileStatus[] lst = fs.listStatus(p);
+        return getPathSize(p, p.getFileSystem(conf));
+    }
+
+    private long getPathSize(Path storePath, FileSystem fs) throws IOException 
{
+        long bytes = 0;
+        FileStatus[] lst = fs.listStatus(storePath);
         if (lst != null) {
             for (FileStatus status : lst) {
-                bytes += status.getLen();
+                if (status.isFile()) {
+                    if (status.getLen() > 0)
+                        bytes += status.getLen();
+                }
+                else { // recursively count nested leaves' (files) sizes
+                    bytes += getPathSize(status.getPath(), fs);
+                }
             }
         }
-
         return bytes;
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
 Fri Feb 24 08:19:42 2017
@@ -92,7 +92,7 @@ public class InputSizeReducerEstimator i
         return reducers;
     }
 
-    static long getTotalInputFileSize(Configuration conf,
+    public static long getTotalInputFileSize(Configuration conf,
             List<POLoad> lds, Job job) throws IOException {
         return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE);
     }
@@ -100,7 +100,7 @@ public class InputSizeReducerEstimator i
     /**
      * Get the input size for as many inputs as possible. Inputs that do not 
report
      * their size nor can pig look that up itself are excluded from this size.
-     * 
+     *
      * @param conf Configuration
      * @param lds List of POLoads
      * @param job Job

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Fri Feb 24 08:19:42 2017
@@ -24,7 +24,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.lang.reflect.Method;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -61,6 +60,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
@@ -71,6 +71,7 @@ import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.PigJobControl;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
@@ -89,7 +90,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataType;
@@ -122,6 +122,7 @@ import org.apache.pig.impl.util.ObjectSe
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 
@@ -311,7 +312,7 @@ public class JobControlCompiler{
                     " should be a time in ms. default=" + 
defaultPigJobControlSleep, e);
         }
 
-        JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep);
+        JobControl jobCtrl = new PigJobControl(grpName, timeToSleep);
 
         try {
             List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
@@ -384,7 +385,7 @@ public class JobControlCompiler{
         ArrayList<Pair<String,Long>> counterPairs;
 
         try {
-            counters = HadoopShims.getCounters(job);
+            counters = MRJobStats.getCounters(job);
 
             String groupName = getGroupName(counters.getGroupNames());
             // In case that the counter group was not find, we need to find
@@ -702,7 +703,8 @@ public class JobControlCompiler{
             // since this path would be invalid for the new job being created
             
pigContext.getProperties().remove("mapreduce.job.credentials.binary");
 
-            conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
+            conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, 
pigContext.getExecType().isLocal());
+            conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, 
ObjectSerializer.serialize(pigContext.getLog4jProperties()));
             conf.set("udf.import.list", 
ObjectSerializer.serialize(PigContext.getPackageImportList()));
             // this is for unit tests since some don't create PigServer
 
@@ -1671,14 +1673,6 @@ public class JobControlCompiler{
         if (distCachePath != null) {
             log.info("Jar file " + url + " already in DistributedCache as "
                     + distCachePath + ". Not copying to hdfs and adding 
again");
-            // Path already in dist cache
-            if (!HadoopShims.isHadoopYARN()) {
-                // Mapreduce in YARN includes $PWD/* which will add all *.jar 
files in classapth.
-                // So don't have to ensure that the jar is separately added to 
mapreduce.job.classpath.files
-                // But path may only be in 'mapred.cache.files' and not be in
-                // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it 
there
-                DistributedCache.addFileToClassPath(distCachePath, conf, 
distCachePath.getFileSystem(conf));
-            }
         }
         else {
             // REGISTER always copies locally the jar file. see 
PigServer.registerJar()
@@ -1964,20 +1958,9 @@ public class JobControlCompiler{
 
     public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
         // the OutputFormat we report to Hadoop is always PigOutputFormat which
-        // can be wrapped with LazyOutputFormat provided if it is supported by
-        // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+        // can be wrapped with LazyOutputFormat provided if 
PigConfiguration.PIG_OUTPUT_LAZY is set
         if 
("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY)))
 {
-            try {
-                Class<?> clazz = PigContext
-                        
.resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
-                Method method = clazz.getMethod("setOutputFormatClass",
-                        org.apache.hadoop.mapreduce.Job.class, Class.class);
-                method.invoke(null, job, PigOutputFormat.class);
-            } catch (Exception e) {
-                job.setOutputFormatClass(PigOutputFormat.class);
-                log.warn(PigConfiguration.PIG_OUTPUT_LAZY
-                        + " is set but LazyOutputFormat couldn't be loaded. 
Default PigOutputFormat will be used");
-            }
+            LazyOutputFormat.setOutputFormatClass(job,PigOutputFormat.class);
         } else {
             job.setOutputFormatClass(PigOutputFormat.class);
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
 Fri Feb 24 08:19:42 2017
@@ -1116,7 +1116,9 @@ public class MRCompiler extends PhyPlanV
         try{
             nonBlocking(op);
             phyToMROpMap.put(op, curMROp);
-            if (op.getPkgr().getPackageType() == PackageType.JOIN) {
+            if (op.getPkgr().getPackageType() == PackageType.JOIN
+                    || op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) 
{
+                // Bloom join is not implemented in mapreduce mode and falls 
back to regular join
                 curMROp.markRegularJoin();
             } else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
                 if (op.getNumInps() == 1) {
@@ -1278,7 +1280,7 @@ public class MRCompiler extends PhyPlanV
                             List<InputSplit> splits = 
inf.getSplits(HadoopShims.cloneJobContext(job));
                             List<List<InputSplit>> results = MapRedUtil
                             .getCombinePigSplits(splits,
-                                    HadoopShims.getDefaultBlockSize(fs, path),
+                                    fs.getDefaultBlockSize(path),
                                     conf);
                             numFiles += results.size();
                         } else {
@@ -2432,7 +2434,7 @@ public class MRCompiler extends PhyPlanV
         }else{
             for(int i=0; i<transformPlans.size(); i++) {
                 eps1.add(transformPlans.get(i));
-                flat1.add(true);
+                flat1.add(i == transformPlans.size() - 1 ? true : false);
             }
         }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
 Fri Feb 24 08:19:42 2017
@@ -19,7 +19,9 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -40,7 +42,8 @@ import org.apache.hadoop.mapred.JobClien
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.pig.PigConfiguration;
@@ -65,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -78,15 +82,18 @@ import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 
+import org.python.google.common.collect.Lists;
+
 
 /**
  * Main class that launches pig for Map Reduce
  *
  */
-public class MapReduceLauncher extends Launcher{
+public class MapReduceLauncher extends Launcher {
 
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
 
@@ -94,14 +101,30 @@ public class MapReduceLauncher extends L
 
     private boolean aggregateWarning = false;
 
+    public MapReduceLauncher() {
+        super();
+        Utils.addShutdownHookWithPriority(new HangingJobKiller(),
+                PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY);
+    }
+
     @Override
     public void kill() {
         try {
-            log.debug("Receive kill signal");
-            if (jc!=null) {
+            if (jc != null && jc.getRunningJobs().size() > 0) {
+                log.info("Received kill signal");
                 for (Job job : jc.getRunningJobs()) {
-                    HadoopShims.killJob(job);
+                    org.apache.hadoop.mapreduce.Job mrJob = job.getJob();
+                    try {
+                        if (mrJob != null) {
+                            mrJob.killJob();
+                        }
+                    } catch (Exception ir) {
+                        throw new IOException(ir);
+                    }
                     log.info("Job " + job.getAssignedJobID() + " killed");
+                    String timeStamp = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss")
+                            .format(Calendar.getInstance().getTime());
+                    System.err.println(timeStamp + " Job " + 
job.getAssignedJobID() + " killed");
                 }
             }
         } catch (Exception e) {
@@ -301,8 +324,7 @@ public class MapReduceLauncher extends L
                 // Now wait, till we are finished.
                 while(!jc.allFinished()){
 
-                    try { jcThread.join(sleepTime); }
-                    catch (InterruptedException e) {}
+                    jcThread.join(sleepTime);
 
                     List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();
 
@@ -321,11 +343,6 @@ public class MapReduceLauncher extends L
                                 log.info("detailed locations: " + 
aliasLocation);
                             }
 
-                            if (!HadoopShims.isHadoopYARN() && jobTrackerLoc 
!= null) {
-                                log.info("More information at: http://"; + 
jobTrackerLoc
-                                        + "/jobdetails.jsp?jobid=" + 
job.getAssignedJobID());
-                            }
-
                             // update statistics for this job so jobId is set
                             MRPigStatsUtil.addJobStats(job);
                             MRScriptState.get().emitJobStartedNotification(
@@ -475,10 +492,6 @@ public class MapReduceLauncher extends L
             for (Job job : succJobs) {
                 List<POStore> sts = jcc.getStores(job);
                 for (POStore st : sts) {
-                    if (Utils.isLocal(pc, job.getJobConf())) {
-                        HadoopShims.storeSchemaForLocal(job, st);
-                    }
-
                     if (!st.isTmpStore()) {
                         // create an "_SUCCESS" file in output location if
                         // output location is a filesystem dir
@@ -744,7 +757,7 @@ public class MapReduceLauncher extends L
     @SuppressWarnings("deprecation")
     void computeWarningAggregate(Job job, Map<Enum, Long> aggMap) {
         try {
-            Counters counters = HadoopShims.getCounters(job);
+            Counters counters = MRJobStats.getCounters(job);
             if (counters==null)
             {
                 long nullCounterCount =
@@ -798,13 +811,13 @@ public class MapReduceLauncher extends L
             throw new ExecException(backendException);
         }
         try {
-            Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, 
TaskType.MAP);
+            Iterator<TaskReport> mapRep = MRJobStats.getTaskReports(job, 
TaskType.MAP);
             if (mapRep != null) {
                 getErrorMessages(mapRep, "map", errNotDbg, pigContext);
                 totalHadoopTimeSpent += computeTimeSpent(mapRep);
                 mapRep = null;
             }
-            Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, 
TaskType.REDUCE);
+            Iterator<TaskReport> redRep = MRJobStats.getTaskReports(job, 
TaskType.REDUCE);
             if (redRep != null) {
                 getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
                 totalHadoopTimeSpent += computeTimeSpent(redRep);
@@ -822,5 +835,6 @@ public class MapReduceLauncher extends L
             throw new ExecException(e);
         }
     }
+
 }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
 Fri Feb 24 08:19:42 2017
@@ -65,7 +65,10 @@ public class MapReduceOper extends Opera
     // this is needed when the key is null to create
     // an appropriate NullableXXXWritable object
     public byte mapKeyType;
-    
+
+    //record the map key types of all splittees
+    public byte[] mapKeyTypeOfSplittees;
+
     //Indicates that the map plan creation
     //is complete
     boolean mapDone = false;

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
 Fri Feb 24 08:19:42 2017
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -580,18 +581,17 @@ class MultiQueryOptimizer extends MROpPl
     }
 
     private boolean hasSameMapKeyType(List<MapReduceOper> splittees) {
-        boolean sameKeyType = true;
-        for (MapReduceOper outer : splittees) {
-            for (MapReduceOper inner : splittees) {
-                if (inner.mapKeyType != outer.mapKeyType) {
-                    sameKeyType = false;
-                    break;
+        Set<Byte> keyTypes = new HashSet<Byte>();
+        for (MapReduceOper splittee : splittees) {
+            keyTypes.add(splittee.mapKeyType);
+            if (splittee.mapKeyTypeOfSplittees != null) {
+                for (int i = 0; i < splittee.mapKeyTypeOfSplittees.length; 
i++) {
+                    keyTypes.add(splittee.mapKeyTypeOfSplittees[i]);
                 }
             }
-            if (!sameKeyType) break;
-        }
 
-        return sameKeyType;
+        }
+        return keyTypes.size() == 1;
     }
 
     private int setIndexOnLRInSplit(int initial, POSplit splitOp, boolean 
sameKeyType)
@@ -1035,10 +1035,20 @@ class MultiQueryOptimizer extends MROpPl
         splitter.mapKeyType = sameKeyType ?
                 mergeList.get(0).mapKeyType : DataType.TUPLE;
 
+
+        setMapKeyTypeForSplitter(splitter,mergeList);
+
         log.info("Requested parallelism of splitter: "
                 + splitter.getRequestedParallelism());
     }
 
+    private void setMapKeyTypeForSplitter(MapReduceOper splitter, 
List<MapReduceOper> mergeList) {
+        splitter.mapKeyTypeOfSplittees = new byte[mergeList.size()];
+        for (int i = 0; i < mergeList.size(); i++) {
+            splitter.mapKeyTypeOfSplittees[i] = mergeList.get(i).mapKeyType;
+        }
+    }
+
     private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce,
             MapReduceOper splitter, POSplit splitOp) throws VisitorException {
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
 Fri Feb 24 08:19:42 2017
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,9 +37,11 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -72,7 +75,6 @@ public class PigCombiner {
         PhysicalOperator[] roots;
         PhysicalOperator leaf;
 
-        PigContext pigContext = null;
         private volatile boolean initialized = false;
 
         //@StaticDataCleanup
@@ -91,9 +93,11 @@ public class PigCombiner {
             Configuration jConf = context.getConfiguration();
             try {
                 
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
-                pigContext = 
(PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
-                if (pigContext.getLog4jProperties()!=null)
-                    
PropertyConfigurator.configure(pigContext.getLog4jProperties());
+                Properties log4jProperties = (Properties) ObjectSerializer
+                        
.deserialize(jConf.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
+                if (log4jProperties != null) {
+                    PropertyConfigurator.configure(log4jProperties);
+                }
                 UDFContext.getUDFContext().reset();
                 MapRedUtil.setupUDFContext(context.getConfiguration());
 
@@ -143,7 +147,7 @@ public class PigCombiner {
                 pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
-                boolean aggregateWarning = 
"true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+                boolean aggregateWarning = 
"true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = 
PigStatusReporter.getInstance();
                 pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = 
PigHadoopLogger.getInstance();
@@ -157,7 +161,7 @@ public class PigCombiner {
             // tuples out of the getnext() call of POJoinPackage
             // In this case, we process till we see EOP from
             // POJoinPacakage.getNext()
-            if (pack.getPkgr() instanceof JoinPackager)
+            if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() 
instanceof BloomPackager)
             {
                 pack.attachInput(key, tupIter.iterator());
                 while (true)
@@ -268,7 +272,6 @@ public class PigCombiner {
             pigReporter = null;
             // Avoid OOM in Tez.
             PhysicalOperator.setReporter(null);
-            pigContext = null;
             roots = null;
             cp = null;
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
 Fri Feb 24 08:19:42 2017
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,6 +46,7 @@ import org.apache.pig.data.SchemaTupleBa
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -88,7 +90,6 @@ public abstract class PigGenericMapBase
 
     private PhysicalOperator leaf;
 
-    PigContext pigContext = null;
     private volatile boolean initialized = false;
 
     /**
@@ -168,13 +169,15 @@ public abstract class PigGenericMapBase
         inIllustrator = inIllustrator(context);
 
         
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
-        pigContext = 
(PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
 
         // This attempts to fetch all of the generated code from the 
distributed cache, and resolve it
-        SchemaTupleBackend.initialize(job, pigContext);
+        SchemaTupleBackend.initialize(job);
 
-        if (pigContext.getLog4jProperties()!=null)
-            PropertyConfigurator.configure(pigContext.getLog4jProperties());
+        Properties log4jProperties = (Properties) ObjectSerializer
+                .deserialize(job.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
+        if (log4jProperties != null) {
+            PropertyConfigurator.configure(log4jProperties);
+        }
 
         if (mp == null)
             mp = (PhysicalPlan) ObjectSerializer.deserialize(
@@ -236,7 +239,7 @@ public abstract class PigGenericMapBase
             pigReporter.setRep(context);
             PhysicalOperator.setReporter(pigReporter);
 
-            boolean aggregateWarning = 
"true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+            boolean aggregateWarning = 
"true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
             PigStatusReporter pigStatusReporter = 
PigStatusReporter.getInstance();
             pigStatusReporter.setContext(new MRTaskContext(context));
             PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
@@ -249,8 +252,7 @@ public abstract class PigGenericMapBase
                     MapReducePOStoreImpl impl
                         = new MapReducePOStoreImpl(context);
                     store.setStoreImpl(impl);
-                    if (!pigContext.inIllustrator)
-                        store.setUp();
+                    store.setUp();
                 }
             }
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
 Fri Feb 24 08:19:42 2017
@@ -287,7 +287,6 @@ public class PigGenericMapReduce {
 
         private PhysicalOperator leaf;
 
-        PigContext pigContext = null;
         protected volatile boolean initialized = false;
 
         private boolean inIllustrator = false;
@@ -319,10 +318,9 @@ public class PigGenericMapReduce {
             sJobConf = context.getConfiguration();
             try {
                 
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
-                pigContext = 
(PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
 
                 // This attempts to fetch all of the generated code from the 
distributed cache, and resolve it
-                SchemaTupleBackend.initialize(jConf, pigContext);
+                SchemaTupleBackend.initialize(jConf);
 
                 if (rp == null)
                     rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
@@ -377,7 +375,7 @@ public class PigGenericMapReduce {
                 pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
-                boolean aggregateWarning = 
"true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+                boolean aggregateWarning = 
"true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = 
PigStatusReporter.getInstance();
                 pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = 
PigHadoopLogger.getInstance();
@@ -608,7 +606,7 @@ public class PigGenericMapReduce {
                 pigReporter.setRep(context);
                 PhysicalOperator.setReporter(pigReporter);
 
-                boolean aggregateWarning = 
"true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+                boolean aggregateWarning = 
"true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning"));
                 PigStatusReporter pigStatusReporter = 
PigStatusReporter.getInstance();
                 pigStatusReporter.setContext(new MRTaskContext(context));
                 PigHadoopLogger pigHadoopLogger = 
PigHadoopLogger.getInstance();

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
 Fri Feb 24 08:19:42 2017
@@ -17,9 +17,6 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
-import java.util.Map;
-import java.util.WeakHashMap;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.EvalFunc;
@@ -41,7 +38,6 @@ public final class PigHadoopLogger imple
 
     private PigStatusReporter reporter = null;
     private boolean aggregate = false;
-    private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
 
     private PigHadoopLogger() {
     }
@@ -68,11 +64,6 @@ public final class PigHadoopLogger imple
 
         if (getAggregate()) {
             if (reporter != null) {
-                // log at least once
-                if (msgMap.get(o) == null || 
!msgMap.get(o).equals(displayMessage)) {
-                    log.warn(displayMessage);
-                    msgMap.put(o, displayMessage);
-                }
                 if (o instanceof EvalFunc || o instanceof LoadFunc || o 
instanceof StoreFunc) {
                     reporter.incrCounter(className, warningEnum.name(), 1);
                 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
 Fri Feb 24 08:19:42 2017
@@ -197,14 +197,11 @@ public class PigInputFormat extends Inpu
 
         ArrayList<FileSpec> inputs;
         ArrayList<ArrayList<OperatorKey>> inpTargets;
-        PigContext pigContext;
         try {
             inputs = (ArrayList<FileSpec>) ObjectSerializer
                     .deserialize(conf.get(PIG_INPUTS));
             inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
                     .deserialize(conf.get(PIG_INPUT_TARGETS));
-            pigContext = (PigContext) ObjectSerializer.deserialize(conf
-                    .get("pig.pigContext"));
             
PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));
             MapRedUtil.setupUDFContext(conf);
         } catch (Exception e) {
@@ -234,7 +231,7 @@ public class PigInputFormat extends Inpu
 
                 // if the execution is against Mapred DFS, set
                 // working dir to /user/<userid>
-                if(!Utils.isLocal(pigContext, conf)) {
+                if(!Utils.isLocal(conf)) {
                     fs.setWorkingDirectory(jobcontext.getWorkingDirectory());
                 }
 
@@ -270,7 +267,7 @@ public class PigInputFormat extends Inpu
                                 jobcontext.getJobID()));
                 List<InputSplit> oneInputPigSplits = getPigSplits(
                         oneInputSplits, i, inpTargets.get(i),
-                        HadoopShims.getDefaultBlockSize(fs, isFsPath? path: 
fs.getWorkingDirectory()),
+                        fs.getDefaultBlockSize(isFsPath? path: 
fs.getWorkingDirectory()),
                         combinable, confClone);
                 splits.addAll(oneInputPigSplits);
             } catch (ExecException ee) {


Reply via email to