Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/PigATSClient.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop; + +import java.io.File; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.client.api.TimelineClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; +import org.apache.pig.impl.util.Utils; +import org.apache.pig.tools.pigstats.ScriptState; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +public class PigATSClient { + public static class ATSEvent { + public ATSEvent(String pigAuditId, String callerId) { + this.pigScriptId = pigAuditId; + this.callerId = callerId; + } + String callerId; + String pigScriptId; + } + public static final String ENTITY_TYPE = "PIG_SCRIPT_ID"; + public static final String ENTITY_CALLERID = "callerId"; + public static final String CALLER_CONTEXT = "PIG"; + public static final int AUDIT_ID_MAX_LENGTH = 128; + + private static final Log log = LogFactory.getLog(PigATSClient.class.getName()); + private static PigATSClient instance; + private static ExecutorService executor; + private TimelineClient timelineClient; + + public static synchronized PigATSClient getInstance() { + if (instance==null) { + instance = new PigATSClient(); + } + return instance; + } + + private PigATSClient() { + if (executor == null) { + executor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ATS Logger %d").build()); + YarnConfiguration yarnConf = new YarnConfiguration(); + timelineClient = TimelineClient.createTimelineClient(); + timelineClient.init(yarnConf); + timelineClient.start(); + } + Utils.addShutdownHookWithPriority(new Runnable() { + @Override + public void run() { + timelineClient.stop(); + executor.shutdownNow(); + executor = null; + } + }, PigImplConstants.SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY); + log.info("Created ATS Hook"); + } + + public static String getPigAuditId(PigContext context) { + String auditId; + if (context.getProperties().get(PigImplConstants.PIG_AUDIT_ID) != null) { + auditId = (String)context.getProperties().get(PigImplConstants.PIG_AUDIT_ID); + } else { + ScriptState ss = ScriptState.get(); + String filename = ss.getFileName().isEmpty()?"default" : new File(ss.getFileName()).getName(); + auditId = CALLER_CONTEXT + "-" + filename + "-" + ss.getId(); + } + return auditId.substring(0, Math.min(auditId.length(), AUDIT_ID_MAX_LENGTH)); + } + + synchronized public void logEvent(final ATSEvent event) { + executor.submit(new Runnable() { + @Override + public void run() { + TimelineEntity entity = new TimelineEntity(); + entity.setEntityId(event.pigScriptId); + entity.setEntityType(ENTITY_TYPE); + entity.addPrimaryFilter(ENTITY_CALLERID, event.callerId!=null?event.callerId : "default"); + try { + timelineClient.putEntities(entity); + } catch (Exception e) { + log.info("Failed to submit plan to ATS: " + e.getMessage()); + } + } + }); + } +}
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/PigJobControl.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Iterator; +import java.util.LinkedList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.jobcontrol.JobControl; +import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; +import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.State; + +/** + * extends the hadoop JobControl to remove the hardcoded sleep(5000) + * as most of this is private we have to use reflection + * + * See {@link https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23.1/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/jobcontrol/JobControl.java } + * + */ +public class PigJobControl extends JobControl { + private static final Log log = LogFactory.getLog(PigJobControl.class); + + private static Field runnerState; + private static Field jobsInProgress; + private static Field successfulJobs; + private static Field failedJobs; + + private static Method failAllJobs; + + private static Method checkState; + private static Method submit; + + private static boolean initSuccesful; + + static { + try { + + runnerState = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("runnerState"); + runnerState.setAccessible(true); + jobsInProgress = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("jobsInProgress"); + jobsInProgress.setAccessible(true); + successfulJobs = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("successfulJobs"); + successfulJobs.setAccessible(true); + failedJobs = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredField("failedJobs"); + failedJobs.setAccessible(true); + + failAllJobs = org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.class.getDeclaredMethod("failAllJobs", Throwable.class); + failAllJobs.setAccessible(true); + + checkState = ControlledJob.class.getDeclaredMethod("checkState"); + checkState.setAccessible(true); + submit = ControlledJob.class.getDeclaredMethod("submit"); + submit.setAccessible(true); + + initSuccesful = true; + } catch (Exception e) { + log.debug("falling back to default JobControl (not using hadoop 0.23 ?)", e); + initSuccesful = false; + } + } + + protected int timeToSleep; + + /** + * Construct a job control for a group of jobs. + * @param groupName a name identifying this group + * @param pigContext + * @param conf + */ + public PigJobControl(String groupName, int timeToSleep) { + super(groupName); + this.timeToSleep = timeToSleep; + } + + public int getTimeToSleep() { + return timeToSleep; + } + + public void setTimeToSleep(int timeToSleep) { + this.timeToSleep = timeToSleep; + } + + private void setRunnerState(ThreadState state) { + try { + runnerState.set(this, state); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + private ThreadState getRunnerState() { + try { + return (ThreadState)runnerState.get(this); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private State checkState(ControlledJob j) { + try { + return (State)checkState.invoke(j); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private State submit(ControlledJob j) { + try { + return (State)submit.invoke(j); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("unchecked") + private LinkedList<ControlledJob> getJobs(Field field) { + try { + return (LinkedList<ControlledJob>)field.get(this); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void failAllJobs(Throwable t) { + try { + failAllJobs.invoke(this, t); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * The main loop for the thread. + * The loop does the following: + * Check the states of the running jobs + * Update the states of waiting jobs + * Submit the jobs in ready state + */ + public void run() { + if (!initSuccesful) { + super.run(); + return; + } + try { + setRunnerState(ThreadState.RUNNING); + while (true) { + while (getRunnerState() == ThreadState.SUSPENDED) { + try { + Thread.sleep(timeToSleep); + } + catch (Exception e) { + //TODO the thread was interrupted, do something!!! + } + } + + synchronized(this) { + Iterator<ControlledJob> it = getJobs(jobsInProgress).iterator(); + if (!it.hasNext()) { + stop(); + } + while(it.hasNext()) { + ControlledJob j = it.next(); + + // TODO: Need to re-visit the following try...catch + // when Pig picks up a Hadoop release with MAPREDUCE-6762 applied + // as its dependency. + try { + log.debug("Checking state of job " + j); + } catch(NullPointerException npe) { + log.warn("Failed to get job name " + + "when checking state of job. " + + "Check if job status is null.", npe); + } + + switch(checkState(j)) { + case SUCCESS: + getJobs(successfulJobs).add(j); + it.remove(); + break; + case FAILED: + case DEPENDENT_FAILED: + getJobs(failedJobs).add(j); + it.remove(); + break; + case READY: + submit(j); + break; + case RUNNING: + case WAITING: + //Do Nothing + break; + } + } + } + + if (getRunnerState() != ThreadState.RUNNING && + getRunnerState() != ThreadState.SUSPENDED) { + break; + } + try { + Thread.sleep(timeToSleep); + } + catch (Exception e) { + //TODO the thread was interrupted, do something!!! + } + if (getRunnerState() != ThreadState.RUNNING && + getRunnerState() != ThreadState.SUSPENDED) { + break; + } + } + }catch(Throwable t) { + log.error("Error while trying to run jobs.",t); + //Mark all jobs as failed because we got something bad. + failAllJobs(t); + } + setRunnerState(ThreadState.STOPPED); + } + + +} Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java Fri Feb 24 08:19:42 2017 @@ -17,8 +17,6 @@ package org.apache.pig.backend.hadoop.accumulo; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.math.BigDecimal; import java.math.BigInteger; import java.util.Collection; @@ -303,24 +301,8 @@ public abstract class AbstractAccumuloSt */ protected void simpleUnset(Configuration conf, Map<String, String> entriesToUnset) { - try { - Method unset = conf.getClass().getMethod("unset", String.class); - - for (String key : entriesToUnset.keySet()) { - unset.invoke(conf, key); - } - } catch (NoSuchMethodException e) { - log.error("Could not invoke Configuration.unset method", e); - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - log.error("Could not invoke Configuration.unset method", e); - throw new RuntimeException(e); - } catch (IllegalArgumentException e) { - log.error("Could not invoke Configuration.unset method", e); - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - log.error("Could not invoke Configuration.unset method", e); - throw new RuntimeException(e); + for (String key : entriesToUnset.keySet()) { + conf.unset(key); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java Fri Feb 24 08:19:42 2017 @@ -22,8 +22,6 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.net.URL; import java.net.URLDecoder; import java.text.MessageFormat; @@ -42,6 +40,7 @@ import java.util.zip.ZipOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Logger; @@ -112,7 +111,7 @@ public class Utils { // attempt to locate an existing jar for the class. String jar = findContainingJar(my_class, packagedClasses); if (null == jar || jar.isEmpty()) { - jar = getJar(my_class); + jar = JarFinder.getJar(my_class); updateMap(jar, packagedClasses); } @@ -200,41 +199,6 @@ public class Utils { } /** - * Invoke 'getJar' on a JarFinder implementation. Useful for some job - * configuration contexts (HBASE-8140) and also for testing on MRv2. First - * check if we have HADOOP-9426. Lacking that, fall back to the backport. - * - * @param my_class - * the class to find. - * @return a jar file that contains the class, or null. - */ - private static String getJar(Class<?> my_class) { - String ret = null; - String hadoopJarFinder = "org.apache.hadoop.util.JarFinder"; - Class<?> jarFinder = null; - try { - log.debug("Looking for " + hadoopJarFinder + "."); - jarFinder = Class.forName(hadoopJarFinder); - log.debug(hadoopJarFinder + " found."); - Method getJar = jarFinder.getMethod("getJar", Class.class); - ret = (String) getJar.invoke(null, my_class); - } catch (ClassNotFoundException e) { - log.debug("Using backported JarFinder."); - ret = jarFinderGetJar(my_class); - } catch (InvocationTargetException e) { - // function was properly called, but threw it's own exception. - // Unwrap it - // and pass it on. - throw new RuntimeException(e.getCause()); - } catch (Exception e) { - // toss all other exceptions, related to reflection failure - throw new RuntimeException("getJar invocation failed.", e); - } - - return ret; - } - - /** * Returns the full path to the Jar containing the class. It always return a * JAR. * Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java Fri Feb 24 08:19:42 2017 @@ -29,7 +29,6 @@ import org.apache.pig.ExecType; import org.apache.pig.PigConstants; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; public class ConfigurationUtil { @@ -89,7 +88,7 @@ public class ConfigurationUtil { // so build/classes/hadoop-site.xml contains such entry. This prevents some tests from // successful (They expect those files in hdfs), so we need to unset it in hadoop 23. // This should go away once MiniMRCluster fix the distributed cache issue. - HadoopShims.unsetConf(localConf, MRConfiguration.JOB_CACHE_FILES); + localConf.unset(MRConfiguration.JOB_CACHE_FILES); } localConf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///"); Properties props = ConfigurationUtil.toProperties(localConf); @@ -106,4 +105,14 @@ public class ConfigurationUtil { } } } + + /** + * Returns Properties containing alternative names of given property and same values - can be used to solve deprecations + * @return + */ + public static Properties expandForAlternativeNames(String name, String value){ + final Configuration config = new Configuration(false); + config.set(name,value); + return ConfigurationUtil.toProperties(config); + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java Fri Feb 24 08:19:42 2017 @@ -18,20 +18,20 @@ package org.apache.pig.backend.hadoop.datastorage; -import java.net.URI; import java.io.IOException; +import java.net.URI; import java.util.ArrayList; -import java.util.List; -import java.util.Properties; import java.util.Enumeration; -import java.util.Map; import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.conf.Configuration; import org.apache.pig.PigException; import org.apache.pig.backend.datastorage.ContainerDescriptor; import org.apache.pig.backend.datastorage.DataStorage; @@ -40,8 +40,6 @@ import org.apache.pig.backend.datastorag public class HDataStorage implements DataStorage { - private static final String FILE_SYSTEM_LOCATION = "fs.default.name"; - private FileSystem fs; private Configuration configuration; private Properties properties; @@ -58,9 +56,10 @@ public class HDataStorage implements Dat init(); } + @Override public void init() { // check if name node is set, if not we set local as fail back - String nameNode = this.properties.getProperty(FILE_SYSTEM_LOCATION); + String nameNode = this.properties.getProperty(FileSystem.FS_DEFAULT_NAME_KEY); if (nameNode == null || nameNode.length() == 0) { nameNode = "local"; } @@ -76,14 +75,17 @@ public class HDataStorage implements Dat } } + @Override public void close() throws IOException { fs.close(); } - + + @Override public Properties getConfiguration() { return this.properties; } + @Override public void updateConfiguration(Properties newConfiguration) throws DataStorageException { // TODO sgroschupf 25Feb2008 this method is never called and @@ -92,38 +94,40 @@ public class HDataStorage implements Dat if (newConfiguration == null) { return; } - + Enumeration<Object> newKeys = newConfiguration.keys(); - + while (newKeys.hasMoreElements()) { String key = (String) newKeys.nextElement(); String value = null; - + value = newConfiguration.getProperty(key); - + fs.getConf().set(key,value); } } - + + @Override public Map<String, Object> getStatistics() throws IOException { Map<String, Object> stats = new HashMap<String, Object>(); long usedBytes = fs.getUsed(); stats.put(USED_BYTES_KEY , Long.valueOf(usedBytes).toString()); - + if (fs instanceof DistributedFileSystem) { DistributedFileSystem dfs = (DistributedFileSystem) fs; - + long rawCapacityBytes = dfs.getRawCapacity(); stats.put(RAW_CAPACITY_KEY, Long.valueOf(rawCapacityBytes).toString()); - + long rawUsedBytes = dfs.getRawUsed(); stats.put(RAW_USED_KEY, Long.valueOf(rawUsedBytes).toString()); } - + return stats; } - + + @Override public ElementDescriptor asElement(String name) throws DataStorageException { if (this.isContainer(name)) { return new HDirectory(this, name); @@ -132,70 +136,82 @@ public class HDataStorage implements Dat return new HFile(this, name); } } - + + @Override public ElementDescriptor asElement(ElementDescriptor element) throws DataStorageException { return asElement(element.toString()); } - + + @Override public ElementDescriptor asElement(String parent, - String child) + String child) throws DataStorageException { return asElement((new Path(parent, child)).toString()); } + @Override public ElementDescriptor asElement(ContainerDescriptor parent, - String child) + String child) throws DataStorageException { return asElement(parent.toString(), child); } + @Override public ElementDescriptor asElement(ContainerDescriptor parent, - ElementDescriptor child) + ElementDescriptor child) throws DataStorageException { return asElement(parent.toString(), child.toString()); } - public ContainerDescriptor asContainer(String name) + @Override + public ContainerDescriptor asContainer(String name) throws DataStorageException { return new HDirectory(this, name); } - + + @Override public ContainerDescriptor asContainer(ContainerDescriptor container) throws DataStorageException { return new HDirectory(this, container.toString()); } - + + @Override public ContainerDescriptor asContainer(String parent, - String child) + String child) throws DataStorageException { return new HDirectory(this, parent, child); } + @Override public ContainerDescriptor asContainer(ContainerDescriptor parent, - String child) + String child) throws DataStorageException { return new HDirectory(this, parent.toString(), child); } - + + @Override public ContainerDescriptor asContainer(ContainerDescriptor parent, ContainerDescriptor child) throws DataStorageException { return new HDirectory(this, parent.toString(), child.toString()); } - + + @Override public void setActiveContainer(ContainerDescriptor container) { fs.setWorkingDirectory(new Path(container.toString())); } - + + @Override public ContainerDescriptor getActiveContainer() { return new HDirectory(this, fs.getWorkingDirectory()); } + @Override public boolean isContainer(String name) throws DataStorageException { boolean isContainer = false; Path path = new Path(name); - + try { if ((this.fs.exists(path)) && (! this.fs.isFile(path))) { isContainer = true; @@ -206,10 +222,11 @@ public class HDataStorage implements Dat String msg = "Unable to check name " + name; throw new DataStorageException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e); } - + return isContainer; } - + + @Override public HPath[] asCollection(String pattern) throws DataStorageException { try { FileStatus[] paths = this.fs.globStatus(new Path(pattern)); @@ -218,7 +235,7 @@ public class HDataStorage implements Dat return new HPath[0]; List<HPath> hpaths = new ArrayList<HPath>(); - + for (int i = 0; i < paths.length; ++i) { HPath hpath = (HPath)this.asElement(paths[i].getPath().toString()); if (!hpath.systemElement()) { @@ -233,7 +250,7 @@ public class HDataStorage implements Dat throw new DataStorageException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e); } } - + public FileSystem getHFS() { return fs; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Feb 24 08:19:42 2017 @@ -30,6 +30,7 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.mapred.JobConf; import org.apache.pig.PigException; @@ -76,8 +77,6 @@ public abstract class HExecutionEngine i public static final String MAPRED_DEFAULT_SITE = "mapred-default.xml"; public static final String YARN_DEFAULT_SITE = "yarn-default.xml"; - public static final String FILE_SYSTEM_LOCATION = "fs.default.name"; - public static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = "fs.defaultFS"; public static final String LOCAL = "local"; protected PigContext pigContext; @@ -203,8 +202,8 @@ public abstract class HExecutionEngine i properties.setProperty(MRConfiguration.FRAMEWORK_NAME, LOCAL); } properties.setProperty(MRConfiguration.JOB_TRACKER, LOCAL); - properties.setProperty(FILE_SYSTEM_LOCATION, "file:///"); - properties.setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, "file:///"); + properties.remove("fs.default.name"); //Deprecated in Hadoop 2.x + properties.setProperty(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); jc = getLocalConf(); JobConf s3Jc = getS3Conf(); @@ -220,24 +219,7 @@ public abstract class HExecutionEngine i HKerberos.tryKerberosKeytabLogin(jc); cluster = jc.get(MRConfiguration.JOB_TRACKER); - nameNode = jc.get(FILE_SYSTEM_LOCATION); - if (nameNode == null) { - nameNode = (String) pigContext.getProperties().get(ALTERNATIVE_FILE_SYSTEM_LOCATION); - } - - if (cluster != null && cluster.length() > 0) { - if (!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) { - cluster = cluster + ":50020"; - } - properties.setProperty(MRConfiguration.JOB_TRACKER, cluster); - } - - if (nameNode != null && nameNode.length() > 0) { - if (!nameNode.contains(":") && !nameNode.equalsIgnoreCase(LOCAL)) { - nameNode = nameNode + ":8020"; - } - properties.setProperty(FILE_SYSTEM_LOCATION, nameNode); - } + nameNode = jc.get(FileSystem.FS_DEFAULT_NAME_KEY); LOG.info("Connecting to hadoop file system at: " + (nameNode == null ? LOCAL : nameNode)); @@ -369,7 +351,11 @@ public abstract class HExecutionEngine i @Override public void setProperty(String property, String value) { Properties properties = pigContext.getProperties(); - properties.put(property, value); + if (Configuration.isDeprecated(property)) { + properties.putAll(ConfigurationUtil.expandForAlternativeNames(property, value)); + } else { + properties.put(property, value); + } } @Override @@ -378,6 +364,13 @@ public abstract class HExecutionEngine i } @Override + public void kill() throws BackendException { + if (launcher != null) { + launcher.kill(); + } + } + + @Override public void killJob(String jobID) throws BackendException { if (launcher != null) { launcher.killJob(jobID, getJobConf()); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HJob.java Fri Feb 24 08:19:42 2017 @@ -40,7 +40,7 @@ import org.apache.pig.tools.pigstats.Pig public class HJob implements ExecJob { private final Log log = LogFactory.getLog(getClass()); - + protected JOB_STATUS status; protected PigContext pigContext; protected FileSpec outFileSpec; @@ -48,7 +48,7 @@ public class HJob implements ExecJob { protected String alias; protected POStore poStore; private PigStats stats; - + public HJob(JOB_STATUS status, PigContext pigContext, POStore store, @@ -59,7 +59,7 @@ public class HJob implements ExecJob { this.outFileSpec = poStore.getSFile(); this.alias = alias; } - + public HJob(JOB_STATUS status, PigContext pigContext, POStore store, @@ -72,37 +72,41 @@ public class HJob implements ExecJob { this.alias = alias; this.stats = stats; } - + + @Override public JOB_STATUS getStatus() { return status; } - + + @Override public boolean hasCompleted() throws ExecException { return true; } - + + @Override public Iterator<Tuple> getResults() throws ExecException { final LoadFunc p; - + try{ - LoadFunc originalLoadFunc = + LoadFunc originalLoadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec( outFileSpec.getFuncSpec()); - - p = (LoadFunc) new ReadToEndLoader(originalLoadFunc, + + p = (LoadFunc) new ReadToEndLoader(originalLoadFunc, ConfigurationUtil.toConfiguration( - pigContext.getProperties()), outFileSpec.getFileName(), 0, pigContext); + pigContext.getProperties()), outFileSpec.getFileName(), 0); }catch (Exception e){ int errCode = 2088; String msg = "Unable to get results for: " + outFileSpec; throw new ExecException(msg, errCode, PigException.BUG, e); } - + return new Iterator<Tuple>() { Tuple t; boolean atEnd; + @Override public boolean hasNext() { if (atEnd) return false; @@ -120,6 +124,7 @@ public class HJob implements ExecJob { return !atEnd; } + @Override public Tuple next() { Tuple next = t; if (next != null) { @@ -136,6 +141,7 @@ public class HJob implements ExecJob { return next; } + @Override public void remove() { throw new RuntimeException("Removal not supported"); } @@ -143,31 +149,38 @@ public class HJob implements ExecJob { }; } + @Override public Properties getConfiguration() { return pigContext.getProperties(); } + @Override public PigStats getStatistics() { //throw new UnsupportedOperationException(); return stats; } + @Override public void completionNotification(Object cookie) { throw new UnsupportedOperationException(); } - + + @Override public void kill() throws ExecException { throw new UnsupportedOperationException(); } - + + @Override public void getLogs(OutputStream log) throws ExecException { throw new UnsupportedOperationException(); } - + + @Override public void getSTDOut(OutputStream out) throws ExecException { throw new UnsupportedOperationException(); } - + + @Override public void getSTDError(OutputStream error) throws ExecException { throw new UnsupportedOperationException(); } @@ -176,6 +189,7 @@ public class HJob implements ExecJob { backendException = e; } + @Override public Exception getException() { return backendException; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Fri Feb 24 08:19:42 2017 @@ -32,7 +32,8 @@ import java.util.regex.Pattern; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.TaskReport; +import org.apache.hadoop.mapred.TIPStatus; +import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; import org.apache.pig.FuncSpec; @@ -40,7 +41,6 @@ import org.apache.pig.PigException; import org.apache.pig.backend.BackendException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.PlanException; @@ -76,7 +76,7 @@ public abstract class Launcher { protected Map<FileSpec, Exception> failureMap; protected JobControl jc = null; - class HangingJobKiller extends Thread { + protected class HangingJobKiller extends Thread { public HangingJobKiller() {} @Override @@ -90,7 +90,6 @@ public abstract class Launcher { } protected Launcher() { - Runtime.getRuntime().addShutdownHook(new HangingJobKiller()); // handle the windows portion of \r if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) { newLine = "\r\n"; @@ -104,7 +103,6 @@ public abstract class Launcher { public void reset() { failureMap = Maps.newHashMap(); totalHadoopTimeSpent = 0; - jc = null; } /** @@ -179,7 +177,7 @@ public abstract class Launcher { String exceptionCreateFailMsg = null; boolean jobFailed = false; if (msgs.length > 0) { - if (HadoopShims.isJobFailed(report)) { + if (report.getCurrentStatus()== TIPStatus.FAILED) { jobFailed = true; } Set<String> errorMessageSet = new HashSet<String>(); @@ -261,11 +259,30 @@ public abstract class Launcher { List<Job> runnJobs = jc.getRunningJobs(); for (Job j : runnJobs) { - prog += HadoopShims.progressOfRunningJob(j); + prog += progressOfRunningJob(j); } return prog; } + /** + * Returns the progress of a Job j which is part of a submitted JobControl + * object. The progress is for this Job. So it has to be scaled down by the + * num of jobs that are present in the JobControl. + * + * @param j The Job for which progress is required + * @return Returns the percentage progress of this Job + * @throws IOException + */ + private static double progressOfRunningJob(Job j) + throws IOException { + org.apache.hadoop.mapreduce.Job mrJob = j.getJob(); + try { + return (mrJob.mapProgress() + mrJob.reduceProgress()) / 2; + } catch (Exception ir) { + return 0; + } + } + public long getTotalHadoopTimeSpent() { return totalHadoopTimeSpent; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Fri Feb 24 08:19:42 2017 @@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.TaskA import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor; @@ -122,7 +123,8 @@ public class FetchLauncher { poStore.setUp(); TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID(); - HadoopShims.setTaskAttemptId(conf, taskAttemptID); + //Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop + conf.setInt(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, taskAttemptID.getId()); if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) { MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchPOStoreImpl.java Fri Feb 24 08:19:42 2017 @@ -95,7 +95,7 @@ public class FetchPOStoreImpl extends PO } if (outputCommitter.needsTaskCommit(context)) outputCommitter.commitTask(context); - HadoopShims.commitOrCleanup(outputCommitter, context); + outputCommitter.commitJob(context); } @Override @@ -109,7 +109,7 @@ public class FetchPOStoreImpl extends PO } writer = null; } - HadoopShims.commitOrCleanup(outputCommitter, context); + outputCommitter.commitJob(context); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/DistinctCombiner.java Fri Feb 24 08:19:42 2017 @@ -22,43 +22,48 @@ import java.util.Iterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Reducer; - -import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; /** * A special implementation of combiner used only for distinct. This combiner * does not even parse out the records. It just throws away duplicate values - * in the key in order ot minimize the data being sent to the reduce. + * in the key in order to minimize the data being sent to the reduce. */ public class DistinctCombiner { - public static class Combine + public static class Combine extends Reducer<PigNullableWritable, NullableTuple, PigNullableWritable, Writable> { - + private final Log log = LogFactory.getLog(getClass()); - ProgressableReporter pigReporter; - - /** - * Configures the reporter - */ + private static boolean firstTime = true; + + //@StaticDataCleanup + public static void staticDataCleanup() { + firstTime = true; + } + @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); - pigReporter = new ProgressableReporter(); + Configuration jConf = context.getConfiguration(); + // Avoid log spamming + if (firstTime) { + log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location")); + firstTime = false; + } } - + /** * The reduce function which removes values. */ @Override - protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) + protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) throws IOException, InterruptedException { - - pigReporter.setRep(context); // Take the first value and the key and collect // just that. @@ -66,6 +71,7 @@ public class DistinctCombiner { NullableTuple val = iter.next(); context.write(key, val); } + } - + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Fri Feb 24 08:19:42 2017 @@ -75,16 +75,24 @@ public class FileBasedOutputSizeReader i return -1; } - long bytes = 0; Path p = new Path(getLocationUri(sto)); - FileSystem fs = p.getFileSystem(conf); - FileStatus[] lst = fs.listStatus(p); + return getPathSize(p, p.getFileSystem(conf)); + } + + private long getPathSize(Path storePath, FileSystem fs) throws IOException { + long bytes = 0; + FileStatus[] lst = fs.listStatus(storePath); if (lst != null) { for (FileStatus status : lst) { - bytes += status.getLen(); + if (status.isFile()) { + if (status.getLen() > 0) + bytes += status.getLen(); + } + else { // recursively count nested leaves' (files) sizes + bytes += getPathSize(status.getPath(), fs); + } } } - return bytes; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Fri Feb 24 08:19:42 2017 @@ -92,7 +92,7 @@ public class InputSizeReducerEstimator i return reducers; } - static long getTotalInputFileSize(Configuration conf, + public static long getTotalInputFileSize(Configuration conf, List<POLoad> lds, Job job) throws IOException { return getTotalInputFileSize(conf, lds, job, Long.MAX_VALUE); } @@ -100,7 +100,7 @@ public class InputSizeReducerEstimator i /** * Get the input size for as many inputs as possible. Inputs that do not report * their size nor can pig look that up itself are excluded from this size. - * + * * @param conf Configuration * @param lds List of POLoads * @param job Job Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Feb 24 08:19:42 2017 @@ -24,7 +24,6 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.lang.reflect.Method; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; @@ -61,6 +60,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobPriority; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.jobcontrol.JobControl; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.pig.ComparisonFunc; import org.apache.pig.ExecType; import org.apache.pig.FuncSpec; @@ -71,6 +71,7 @@ import org.apache.pig.PigException; import org.apache.pig.StoreFuncInterface; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.HDataType; +import org.apache.pig.backend.hadoop.PigJobControl; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.JobCreationException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner; @@ -89,7 +90,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataType; @@ -122,6 +122,7 @@ import org.apache.pig.impl.util.ObjectSe import org.apache.pig.impl.util.Pair; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; +import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; import org.apache.pig.tools.pigstats.mapreduce.MRScriptState; @@ -311,7 +312,7 @@ public class JobControlCompiler{ " should be a time in ms. default=" + defaultPigJobControlSleep, e); } - JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep); + JobControl jobCtrl = new PigJobControl(grpName, timeToSleep); try { List<MapReduceOper> roots = new LinkedList<MapReduceOper>(); @@ -384,7 +385,7 @@ public class JobControlCompiler{ ArrayList<Pair<String,Long>> counterPairs; try { - counters = HadoopShims.getCounters(job); + counters = MRJobStats.getCounters(job); String groupName = getGroupName(counters.getGroupNames()); // In case that the counter group was not find, we need to find @@ -702,7 +703,8 @@ public class JobControlCompiler{ // since this path would be invalid for the new job being created pigContext.getProperties().remove("mapreduce.job.credentials.binary"); - conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext)); + conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pigContext.getExecType().isLocal()); + conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pigContext.getLog4jProperties())); conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList())); // this is for unit tests since some don't create PigServer @@ -1671,14 +1673,6 @@ public class JobControlCompiler{ if (distCachePath != null) { log.info("Jar file " + url + " already in DistributedCache as " + distCachePath + ". Not copying to hdfs and adding again"); - // Path already in dist cache - if (!HadoopShims.isHadoopYARN()) { - // Mapreduce in YARN includes $PWD/* which will add all *.jar files in classapth. - // So don't have to ensure that the jar is separately added to mapreduce.job.classpath.files - // But path may only be in 'mapred.cache.files' and not be in - // 'mapreduce.job.classpath.files' in Hadoop 1.x. So adding it there - DistributedCache.addFileToClassPath(distCachePath, conf, distCachePath.getFileSystem(conf)); - } } else { // REGISTER always copies locally the jar file. see PigServer.registerJar() @@ -1964,20 +1958,9 @@ public class JobControlCompiler{ public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) { // the OutputFormat we report to Hadoop is always PigOutputFormat which - // can be wrapped with LazyOutputFormat provided if it is supported by - // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set + // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) { - try { - Class<?> clazz = PigContext - .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat"); - Method method = clazz.getMethod("setOutputFormatClass", - org.apache.hadoop.mapreduce.Job.class, Class.class); - method.invoke(null, job, PigOutputFormat.class); - } catch (Exception e) { - job.setOutputFormatClass(PigOutputFormat.class); - log.warn(PigConfiguration.PIG_OUTPUT_LAZY - + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used"); - } + LazyOutputFormat.setOutputFormatClass(job,PigOutputFormat.class); } else { job.setOutputFormatClass(PigOutputFormat.class); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Feb 24 08:19:42 2017 @@ -1116,7 +1116,9 @@ public class MRCompiler extends PhyPlanV try{ nonBlocking(op); phyToMROpMap.put(op, curMROp); - if (op.getPkgr().getPackageType() == PackageType.JOIN) { + if (op.getPkgr().getPackageType() == PackageType.JOIN + || op.getPkgr().getPackageType() == PackageType.BLOOMJOIN) { + // Bloom join is not implemented in mapreduce mode and falls back to regular join curMROp.markRegularJoin(); } else if (op.getPkgr().getPackageType() == PackageType.GROUP) { if (op.getNumInps() == 1) { @@ -1278,7 +1280,7 @@ public class MRCompiler extends PhyPlanV List<InputSplit> splits = inf.getSplits(HadoopShims.cloneJobContext(job)); List<List<InputSplit>> results = MapRedUtil .getCombinePigSplits(splits, - HadoopShims.getDefaultBlockSize(fs, path), + fs.getDefaultBlockSize(path), conf); numFiles += results.size(); } else { @@ -2432,7 +2434,7 @@ public class MRCompiler extends PhyPlanV }else{ for(int i=0; i<transformPlans.size(); i++) { eps1.add(transformPlans.get(i)); - flat1.add(true); + flat1.add(i == transformPlans.size() - 1 ? true : false); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Feb 24 08:19:42 2017 @@ -19,7 +19,9 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.io.PrintStream; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Calendar; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -40,7 +42,8 @@ import org.apache.hadoop.mapred.JobClien import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapred.TaskReport; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapreduce.TaskType; import org.apache.pig.PigConfiguration; @@ -65,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.CompilationMessageCollector; @@ -78,15 +82,18 @@ import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.OutputStats; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.PigStatsUtil; +import org.apache.pig.tools.pigstats.mapreduce.MRJobStats; import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; import org.apache.pig.tools.pigstats.mapreduce.MRScriptState; +import org.python.google.common.collect.Lists; + /** * Main class that launches pig for Map Reduce * */ -public class MapReduceLauncher extends Launcher{ +public class MapReduceLauncher extends Launcher { public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; @@ -94,14 +101,30 @@ public class MapReduceLauncher extends L private boolean aggregateWarning = false; + public MapReduceLauncher() { + super(); + Utils.addShutdownHookWithPriority(new HangingJobKiller(), + PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY); + } + @Override public void kill() { try { - log.debug("Receive kill signal"); - if (jc!=null) { + if (jc != null && jc.getRunningJobs().size() > 0) { + log.info("Received kill signal"); for (Job job : jc.getRunningJobs()) { - HadoopShims.killJob(job); + org.apache.hadoop.mapreduce.Job mrJob = job.getJob(); + try { + if (mrJob != null) { + mrJob.killJob(); + } + } catch (Exception ir) { + throw new IOException(ir); + } log.info("Job " + job.getAssignedJobID() + " killed"); + String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + .format(Calendar.getInstance().getTime()); + System.err.println(timeStamp + " Job " + job.getAssignedJobID() + " killed"); } } } catch (Exception e) { @@ -301,8 +324,7 @@ public class MapReduceLauncher extends L // Now wait, till we are finished. while(!jc.allFinished()){ - try { jcThread.join(sleepTime); } - catch (InterruptedException e) {} + jcThread.join(sleepTime); List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>(); @@ -321,11 +343,6 @@ public class MapReduceLauncher extends L log.info("detailed locations: " + aliasLocation); } - if (!HadoopShims.isHadoopYARN() && jobTrackerLoc != null) { - log.info("More information at: http://" + jobTrackerLoc - + "/jobdetails.jsp?jobid=" + job.getAssignedJobID()); - } - // update statistics for this job so jobId is set MRPigStatsUtil.addJobStats(job); MRScriptState.get().emitJobStartedNotification( @@ -475,10 +492,6 @@ public class MapReduceLauncher extends L for (Job job : succJobs) { List<POStore> sts = jcc.getStores(job); for (POStore st : sts) { - if (Utils.isLocal(pc, job.getJobConf())) { - HadoopShims.storeSchemaForLocal(job, st); - } - if (!st.isTmpStore()) { // create an "_SUCCESS" file in output location if // output location is a filesystem dir @@ -744,7 +757,7 @@ public class MapReduceLauncher extends L @SuppressWarnings("deprecation") void computeWarningAggregate(Job job, Map<Enum, Long> aggMap) { try { - Counters counters = HadoopShims.getCounters(job); + Counters counters = MRJobStats.getCounters(job); if (counters==null) { long nullCounterCount = @@ -798,13 +811,13 @@ public class MapReduceLauncher extends L throw new ExecException(backendException); } try { - Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP); + Iterator<TaskReport> mapRep = MRJobStats.getTaskReports(job, TaskType.MAP); if (mapRep != null) { getErrorMessages(mapRep, "map", errNotDbg, pigContext); totalHadoopTimeSpent += computeTimeSpent(mapRep); mapRep = null; } - Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE); + Iterator<TaskReport> redRep = MRJobStats.getTaskReports(job, TaskType.REDUCE); if (redRep != null) { getErrorMessages(redRep, "reduce", errNotDbg, pigContext); totalHadoopTimeSpent += computeTimeSpent(redRep); @@ -822,5 +835,6 @@ public class MapReduceLauncher extends L throw new ExecException(e); } } + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Fri Feb 24 08:19:42 2017 @@ -65,7 +65,10 @@ public class MapReduceOper extends Opera // this is needed when the key is null to create // an appropriate NullableXXXWritable object public byte mapKeyType; - + + //record the map key types of all splittees + public byte[] mapKeyTypeOfSplittees; + //Indicates that the map plan creation //is complete boolean mapDone = false; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Fri Feb 24 08:19:42 2017 @@ -18,6 +18,7 @@ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -580,18 +581,17 @@ class MultiQueryOptimizer extends MROpPl } private boolean hasSameMapKeyType(List<MapReduceOper> splittees) { - boolean sameKeyType = true; - for (MapReduceOper outer : splittees) { - for (MapReduceOper inner : splittees) { - if (inner.mapKeyType != outer.mapKeyType) { - sameKeyType = false; - break; + Set<Byte> keyTypes = new HashSet<Byte>(); + for (MapReduceOper splittee : splittees) { + keyTypes.add(splittee.mapKeyType); + if (splittee.mapKeyTypeOfSplittees != null) { + for (int i = 0; i < splittee.mapKeyTypeOfSplittees.length; i++) { + keyTypes.add(splittee.mapKeyTypeOfSplittees[i]); } } - if (!sameKeyType) break; - } - return sameKeyType; + } + return keyTypes.size() == 1; } private int setIndexOnLRInSplit(int initial, POSplit splitOp, boolean sameKeyType) @@ -1035,10 +1035,20 @@ class MultiQueryOptimizer extends MROpPl splitter.mapKeyType = sameKeyType ? mergeList.get(0).mapKeyType : DataType.TUPLE; + + setMapKeyTypeForSplitter(splitter,mergeList); + log.info("Requested parallelism of splitter: " + splitter.getRequestedParallelism()); } + private void setMapKeyTypeForSplitter(MapReduceOper splitter, List<MapReduceOper> mergeList) { + splitter.mapKeyTypeOfSplittees = new byte[mergeList.size()]; + for (int i = 0; i < mergeList.size(); i++) { + splitter.mapKeyTypeOfSplittees[i] = mergeList.get(i).mapKeyType; + } + } + private void mergeSingleMapReduceSplittee(MapReduceOper mapReduce, MapReduceOper splitter, POSplit splitOp) throws VisitorException { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Fri Feb 24 08:19:42 2017 @@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,9 +37,11 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.util.ObjectSerializer; @@ -72,7 +75,6 @@ public class PigCombiner { PhysicalOperator[] roots; PhysicalOperator leaf; - PigContext pigContext = null; private volatile boolean initialized = false; //@StaticDataCleanup @@ -91,9 +93,11 @@ public class PigCombiner { Configuration jConf = context.getConfiguration(); try { PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list"))); - pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext")); - if (pigContext.getLog4jProperties()!=null) - PropertyConfigurator.configure(pigContext.getLog4jProperties()); + Properties log4jProperties = (Properties) ObjectSerializer + .deserialize(jConf.get(PigImplConstants.PIG_LOG4J_PROPERTIES)); + if (log4jProperties != null) { + PropertyConfigurator.configure(log4jProperties); + } UDFContext.getUDFContext().reset(); MapRedUtil.setupUDFContext(context.getConfiguration()); @@ -143,7 +147,7 @@ public class PigCombiner { pigReporter.setRep(context); PhysicalOperator.setReporter(pigReporter); - boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new MRTaskContext(context)); PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); @@ -157,7 +161,7 @@ public class PigCombiner { // tuples out of the getnext() call of POJoinPackage // In this case, we process till we see EOP from // POJoinPacakage.getNext() - if (pack.getPkgr() instanceof JoinPackager) + if (pack.getPkgr() instanceof JoinPackager || pack.getPkgr() instanceof BloomPackager) { pack.attachInput(key, tupIter.iterator()); while (true) @@ -268,7 +272,6 @@ public class PigCombiner { pigReporter = null; // Avoid OOM in Tez. PhysicalOperator.setReporter(null); - pigContext = null; roots = null; cp = null; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Fri Feb 24 08:19:42 2017 @@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,6 +46,7 @@ import org.apache.pig.data.SchemaTupleBa import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.io.PigNullableWritable; import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.OperatorKey; @@ -88,7 +90,6 @@ public abstract class PigGenericMapBase private PhysicalOperator leaf; - PigContext pigContext = null; private volatile boolean initialized = false; /** @@ -168,13 +169,15 @@ public abstract class PigGenericMapBase inIllustrator = inIllustrator(context); PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list"))); - pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext")); // This attempts to fetch all of the generated code from the distributed cache, and resolve it - SchemaTupleBackend.initialize(job, pigContext); + SchemaTupleBackend.initialize(job); - if (pigContext.getLog4jProperties()!=null) - PropertyConfigurator.configure(pigContext.getLog4jProperties()); + Properties log4jProperties = (Properties) ObjectSerializer + .deserialize(job.get(PigImplConstants.PIG_LOG4J_PROPERTIES)); + if (log4jProperties != null) { + PropertyConfigurator.configure(log4jProperties); + } if (mp == null) mp = (PhysicalPlan) ObjectSerializer.deserialize( @@ -236,7 +239,7 @@ public abstract class PigGenericMapBase pigReporter.setRep(context); PhysicalOperator.setReporter(pigReporter); - boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new MRTaskContext(context)); PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); @@ -249,8 +252,7 @@ public abstract class PigGenericMapBase MapReducePOStoreImpl impl = new MapReducePOStoreImpl(context); store.setStoreImpl(impl); - if (!pigContext.inIllustrator) - store.setUp(); + store.setUp(); } } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Fri Feb 24 08:19:42 2017 @@ -287,7 +287,6 @@ public class PigGenericMapReduce { private PhysicalOperator leaf; - PigContext pigContext = null; protected volatile boolean initialized = false; private boolean inIllustrator = false; @@ -319,10 +318,9 @@ public class PigGenericMapReduce { sJobConf = context.getConfiguration(); try { PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list"))); - pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext")); // This attempts to fetch all of the generated code from the distributed cache, and resolve it - SchemaTupleBackend.initialize(jConf, pigContext); + SchemaTupleBackend.initialize(jConf); if (rp == null) rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf @@ -377,7 +375,7 @@ public class PigGenericMapReduce { pigReporter.setRep(context); PhysicalOperator.setReporter(pigReporter); - boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new MRTaskContext(context)); PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); @@ -608,7 +606,7 @@ public class PigGenericMapReduce { pigReporter.setRep(context); PhysicalOperator.setReporter(pigReporter); - boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning")); + boolean aggregateWarning = "true".equalsIgnoreCase(context.getConfiguration().get("aggregate.warning")); PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance(); pigStatusReporter.setContext(new MRTaskContext(context)); PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Fri Feb 24 08:19:42 2017 @@ -17,9 +17,6 @@ */ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; -import java.util.Map; -import java.util.WeakHashMap; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.EvalFunc; @@ -41,7 +38,6 @@ public final class PigHadoopLogger imple private PigStatusReporter reporter = null; private boolean aggregate = false; - private Map<Object, String> msgMap = new WeakHashMap<Object, String>(); private PigHadoopLogger() { } @@ -68,11 +64,6 @@ public final class PigHadoopLogger imple if (getAggregate()) { if (reporter != null) { - // log at least once - if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) { - log.warn(displayMessage); - msgMap.put(o, displayMessage); - } if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) { reporter.incrCounter(className, warningEnum.name(), 1); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Fri Feb 24 08:19:42 2017 @@ -197,14 +197,11 @@ public class PigInputFormat extends Inpu ArrayList<FileSpec> inputs; ArrayList<ArrayList<OperatorKey>> inpTargets; - PigContext pigContext; try { inputs = (ArrayList<FileSpec>) ObjectSerializer .deserialize(conf.get(PIG_INPUTS)); inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer .deserialize(conf.get(PIG_INPUT_TARGETS)); - pigContext = (PigContext) ObjectSerializer.deserialize(conf - .get("pig.pigContext")); PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list"))); MapRedUtil.setupUDFContext(conf); } catch (Exception e) { @@ -234,7 +231,7 @@ public class PigInputFormat extends Inpu // if the execution is against Mapred DFS, set // working dir to /user/<userid> - if(!Utils.isLocal(pigContext, conf)) { + if(!Utils.isLocal(conf)) { fs.setWorkingDirectory(jobcontext.getWorkingDirectory()); } @@ -270,7 +267,7 @@ public class PigInputFormat extends Inpu jobcontext.getJobID())); List<InputSplit> oneInputPigSplits = getPigSplits( oneInputSplits, i, inpTargets.get(i), - HadoopShims.getDefaultBlockSize(fs, isFsPath? path: fs.getWorkingDirectory()), + fs.getDefaultBlockSize(isFsPath? path: fs.getWorkingDirectory()), combinable, confClone); splits.addAll(oneInputPigSplits); } catch (ExecException ee) {