Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Wed Feb 22 09:43:41 2017 @@ -70,7 +70,7 @@ public class MapRedUtil { private static Log log = LogFactory.getLog(MapRedUtil.class); private static final TupleFactory tf = TupleFactory.getInstance(); - public static final String FILE_SYSTEM_NAME = "fs.default.name"; + public static final String FILE_SYSTEM_NAME = FileSystem.FS_DEFAULT_NAME_KEY; /** * Loads the key distribution sampler file @@ -301,7 +301,7 @@ public class MapRedUtil { /** * Returns the total number of bytes for this file, or if a directory all * files in the directory. - * + * * @param fs FileSystem * @param status FileStatus * @param max Maximum value of total length that will trigger exit. Many
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Wed Feb 22 09:43:41 2017 @@ -18,7 +18,6 @@ package org.apache.pig.backend.hadoop.hb import java.io.IOException; import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.lang.reflect.UndeclaredThrowableException; import java.math.BigDecimal; import java.math.BigInteger; @@ -65,6 +64,7 @@ import org.apache.hadoop.hbase.mapreduce import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableSplit; +import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; @@ -86,7 +86,6 @@ import org.apache.pig.ResourceSchema.Res import org.apache.pig.StoreFuncInterface; import org.apache.pig.StoreResources; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder; import org.apache.pig.builtin.FuncUtils; import org.apache.pig.builtin.Utf8StorageConverter; @@ -597,7 +596,9 @@ public class HBaseStorage extends LoadFu new BinaryComparator(colInfo.getColumnName()))); } } - thisColumnGroupFilter.addFilter(columnFilters); + if (columnFilters.getFilters().size() != 0) { + thisColumnGroupFilter.addFilter(columnFilters); + } allColumnFilters.addFilter(thisColumnGroupFilter); } if (allColumnFilters != null) { @@ -792,46 +793,35 @@ public class HBaseStorage extends LoadFu public List<String> getShipFiles() { // Depend on HBase to do the right thing when available, as of HBASE-9165 try { - Method addHBaseDependencyJars = - TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class); - if (addHBaseDependencyJars != null) { - Configuration conf = new Configuration(); - addHBaseDependencyJars.invoke(null, conf); - if (conf.get("tmpjars") != null) { - String[] tmpjars = conf.getStrings("tmpjars"); - List<String> shipFiles = new ArrayList<String>(tmpjars.length); - for (String tmpjar : tmpjars) { - shipFiles.add(new URL(tmpjar).getPath()); - } - return shipFiles; + Configuration conf = new Configuration(); + TableMapReduceUtil.addHBaseDependencyJars(conf); + if (conf.get("tmpjars") != null) { + String[] tmpjars = conf.getStrings("tmpjars"); + List<String> shipFiles = new ArrayList<String>(tmpjars.length); + for (String tmpjar : tmpjars) { + shipFiles.add(new URL(tmpjar).getPath()); } + return shipFiles; + } + } catch (IOException e) { + if(e instanceof MalformedURLException){ + LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars" + + " had malformed url. Falling back to previous logic.", e); + }else { + LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation" + + " failed. Falling back to previous logic.", e); } - } catch (NoSuchMethodException e) { - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available." - + " Falling back to previous logic.", e); - } catch (IllegalAccessException e) { - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation" - + " not permitted. Falling back to previous logic.", e); - } catch (InvocationTargetException e) { - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation" - + " failed. Falling back to previous logic.", e); - } catch (MalformedURLException e) { - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars" - + " had malformed url. Falling back to previous logic.", e); } List<Class> classList = new ArrayList<Class>(); classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server - if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 0.23/2 itself has guava - classList.add(com.google.common.collect.Lists.class); // guava - } classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper // Additional jars that are specific to v0.95.0+ addClassToList("org.cloudera.htrace.Trace", classList); // htrace addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common - addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar + addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compat addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty return FuncUtils.getShipFiles(classList); } @@ -882,27 +872,13 @@ public class HBaseStorage extends LoadFu } if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) { - // Will not be entering this block for 0.20.2 as it has no security. try { - // getCurrentUser method is not public in 0.20.2 - Method m1 = UserGroupInformation.class.getMethod("getCurrentUser"); - UserGroupInformation currentUser = (UserGroupInformation) m1.invoke(null,(Object[]) null); - // hasKerberosCredentials method not available in 0.20.2 - Method m2 = UserGroupInformation.class.getMethod("hasKerberosCredentials"); - boolean hasKerberosCredentials = (Boolean) m2.invoke(currentUser, (Object[]) null); - if (hasKerberosCredentials) { - // Class and method are available only from 0.92 security release - Class tokenUtilClass = Class - .forName("org.apache.hadoop.hbase.security.token.TokenUtil"); - Method m3 = tokenUtilClass.getMethod("obtainTokenForJob", new Class[] { - Configuration.class, UserGroupInformation.class, Job.class }); - m3.invoke(null, new Object[] { hbaseConf, currentUser, job }); + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + if (currentUser.hasKerberosCredentials()) { + TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job); } else { LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available"); } - } catch (ClassNotFoundException cnfe) { - throw new RuntimeException("Failure loading TokenUtil class, " - + "is secure RPC available?", cnfe); } catch (RuntimeException re) { throw re; } catch (Exception e) { Modified: pig/branches/spark/src/org/apache/pig/builtin/Bloom.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Bloom.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/Bloom.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/Bloom.java Wed Feb 22 09:43:41 2017 @@ -35,6 +35,7 @@ import org.apache.pig.FilterFunc; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; /** * Use a Bloom filter build previously by BuildBloom. You would first @@ -54,14 +55,36 @@ import org.apache.pig.data.Tuple; * C = filter B by bloom(z); * D = join C by z, A by x; * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}. + * + * You can also pass the Bloom filter from BuildBloom directly to Bloom UDF + * as a scalar instead of storing it to file and loading again. This is simpler + * if the Bloom filter will not be reused and needs to be discarded after the + * run of the script. + * + * define bb BuildBloom('jenkins', '100', '0.1'); + * A = load 'foo' as (x, y); + * B = group A all; + * C = foreach B generate bb(A.x) as bloomfilter; + * D = load 'bar' as (z); + * E = filter D by Bloom(C.bloomfilter, z); + * F = join E by z, A by x; */ public class Bloom extends FilterFunc { + private static TupleFactory mTupleFactory = TupleFactory.getInstance(); + private String bloomFile; - public BloomFilter filter = null; + private BloomFilter filter = null; - /** - * @param filename file containing the serialized Bloom filter + public Bloom() { + } + + /** + * The filename containing the serialized Bloom filter. If filename is null + * or the no-arg constructor is used, then the bloomfilter bytearray which + * is the output of BuildBloom should be passed as the first argument to the UDF + * + * @param filename file containing the serialized Bloom filter */ public Bloom(String filename) { bloomFile = filename; @@ -70,11 +93,25 @@ public class Bloom extends FilterFunc { @Override public Boolean exec(Tuple input) throws IOException { if (filter == null) { - init(); + init(input); } byte[] b; - if (input.size() == 1) b = DataType.toBytes(input.get(0)); - else b = DataType.toBytes(input, DataType.TUPLE); + if (bloomFile == null) { + // The first one is the bloom filter. Skip that + if (input.size() == 2) { + b = DataType.toBytes(input.get(1)); + } else { + List<Object> inputList = input.getAll(); + Tuple tuple = mTupleFactory.newTupleNoCopy(inputList.subList(1, inputList.size())); + b = DataType.toBytes(tuple, DataType.TUPLE); + } + } else { + if (input.size() == 1) { + b = DataType.toBytes(input.get(0)); + } else { + b = DataType.toBytes(input, DataType.TUPLE); + } + } Key k = new Key(b); return filter.membershipTest(k); @@ -82,34 +119,46 @@ public class Bloom extends FilterFunc { @Override public List<String> getCacheFiles() { - List<String> list = new ArrayList<String>(1); - // We were passed the name of the file on HDFS. Append a - // name for the file on the task node. - try { - list.add(bloomFile + "#" + getFilenameFromPath(bloomFile)); - } catch (IOException e) { - throw new RuntimeException(e); + if (bloomFile != null) { + List<String> list = new ArrayList<String>(1); + // We were passed the name of the file on HDFS. Append a + // name for the file on the task node. + try { + list.add(bloomFile + "#" + getFilenameFromPath(bloomFile)); + } catch (IOException e) { + throw new RuntimeException(e); + } + return list; } - return list; + return null; } - private void init() throws IOException { - filter = new BloomFilter(); - String dir = "./" + getFilenameFromPath(bloomFile); - String[] partFiles = new File(dir) - .list(new FilenameFilter() { - @Override - public boolean accept(File current, String name) { - return name.startsWith("part"); - } - }); - - String dcFile = dir + "/" + partFiles[0]; - DataInputStream dis = new DataInputStream(new FileInputStream(dcFile)); - try { - filter.readFields(dis); - } finally { - dis.close(); + private void init(Tuple input) throws IOException { + if (bloomFile == null) { + if (input.get(0) instanceof DataByteArray) { + filter = BuildBloomBase.bloomIn((DataByteArray) input.get(0)); + } else { + throw new IllegalArgumentException("The first argument to the Bloom UDF should be" + + " the bloom filter if a bloom file is not specified in the constructor"); + } + } else { + filter = new BloomFilter(); + String dir = "./" + getFilenameFromPath(bloomFile); + String[] partFiles = new File(dir) + .list(new FilenameFilter() { + @Override + public boolean accept(File current, String name) { + return name.startsWith("part"); + } + }); + + String dcFile = dir + "/" + partFiles[0]; + DataInputStream dis = new DataInputStream(new FileInputStream(dcFile)); + try { + filter.readFields(dis); + } finally { + dis.close(); + } } } Modified: pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java Wed Feb 22 09:43:41 2017 @@ -18,16 +18,15 @@ package org.apache.pig.builtin; -import java.io.IOException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.util.bloom.BloomFilter; import org.apache.hadoop.util.hash.Hash; - import org.apache.pig.EvalFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.data.DataBag; @@ -47,7 +46,7 @@ public abstract class BuildBloomBase<T> protected BuildBloomBase() { } - /** + /** * @param hashType type of the hashing function (see * {@link org.apache.hadoop.util.hash.Hash}). * @param mode Will be ignored, though by convention it should be @@ -64,7 +63,7 @@ public abstract class BuildBloomBase<T> hType = convertHashType(hashType); } - /** + /** * @param hashType type of the hashing function (see * {@link org.apache.hadoop.util.hash.Hash}). * @param numElements The number of distinct elements expected to be @@ -104,7 +103,7 @@ public abstract class BuildBloomBase<T> return new DataByteArray(baos.toByteArray()); } - protected BloomFilter bloomIn(DataByteArray b) throws IOException { + public static BloomFilter bloomIn(DataByteArray b) throws IOException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(b.get())); BloomFilter f = new BloomFilter(); Modified: pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java Wed Feb 22 09:43:41 2017 @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.shims.Hadoop23Shims; import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.Counters; @@ -180,20 +181,9 @@ abstract class HiveUDFBase extends EvalF @Override public List<String> getShipFiles() { - String hadoopVersion = "20S"; - if (Utils.isHadoop23() || Utils.isHadoop2()) { - hadoopVersion = "23"; - } - Class hadoopVersionShimsClass; - try { - hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" + - hadoopVersion + "Shims"); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath"); - } List<String> files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class, - PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class, - hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class}); + PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class, + Hadoop23Shims.class, HadoopShimsSecure.class, Collector.class}); return files; } Modified: pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java Wed Feb 22 09:43:41 2017 @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.shims.Hadoop23Shims; import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; @@ -389,20 +390,8 @@ public class OrcStorage extends LoadFunc @Override public List<String> getShipFiles() { - List<String> cacheFiles = new ArrayList<String>(); - String hadoopVersion = "20S"; - if (Utils.isHadoop23() || Utils.isHadoop2()) { - hadoopVersion = "23"; - } - Class hadoopVersionShimsClass; - try { - hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" + - hadoopVersion + "Shims"); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath"); - } Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class, - org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass, + org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, Hadoop23Shims.class, Input.class}; return FuncUtils.getShipFiles(classList); } @@ -456,7 +445,7 @@ public class OrcStorage extends LoadFunc } private TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException { - FileSystem fs = FileSystem.get(job.getConfiguration()); + FileSystem fs = FileSystem.get(new Path(location).toUri(), job.getConfiguration()); Path path = getFirstFile(location, fs, new NonEmptyOrcFileFilter(fs)); if (path == null) { log.info("Cannot find any ORC files from " + location + Modified: pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java Wed Feb 22 09:43:41 2017 @@ -68,7 +68,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.bzip2r.Bzip2TextInputFormat; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; @@ -171,7 +170,7 @@ LoadPushDown, LoadMetadata, StoreMetadat validOptions.addOption(TAG_SOURCE_FILE, false, "Appends input source file name to beginning of each tuple."); validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple."); validOptions.addOption("tagsource", false, "Appends input source file name to beginning of each tuple."); - Option overwrite = new Option(" ", "Overwrites the destination."); + Option overwrite = new Option("overwrite", "Overwrites the destination."); overwrite.setLongOpt("overwrite"); overwrite.setOptionalArg(true); overwrite.setArgs(1); @@ -412,7 +411,7 @@ LoadPushDown, LoadMetadata, StoreMetadat @Override public InputFormat getInputFormat() { if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) - && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) { + && (!bzipinput_usehadoops) ) { mLog.info("Using Bzip2TextInputFormat"); return new Bzip2TextInputFormat(); } else { Modified: pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java Wed Feb 22 09:43:41 2017 @@ -17,15 +17,63 @@ */ package org.apache.pig.builtin; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Partitioner; -public class RoundRobinPartitioner extends Partitioner<Writable, Writable> { - private int num = 0; +/** + * This partitioner should be used with extreme caution and only in cases + * where the order of output records is guaranteed to be same. If the order of + * output records can vary on retries which is mostly the case, map reruns + * due to shuffle fetch failures can lead to data being partitioned differently + * and result in incorrect output due to loss or duplication of data. + * Refer PIG-5041 for more details. + * + * This will be removed in the next release as it is risky to use in most cases. + */ +@Deprecated +public class RoundRobinPartitioner extends Partitioner<Writable, Writable> + implements Configurable { + + /** + * Batch size for round robin partitioning. Batch size number of records + * will be distributed to each partition in a round robin fashion. Default + * value is 0 which distributes each record in a circular fashion. Higher + * number for batch size can be used to increase probability of keeping + * similar records in the same partition if output is already sorted and get + * better compression. + */ + public static String PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE = "pig.round.robin.partitioner.batch.size"; + private int num = -1; + private int batchSize = 0; + private int currentBatchCount = 0; + private Configuration conf; @Override public int getPartition(Writable key, Writable value, int numPartitions) { - num = ++num % numPartitions; + if (batchSize > 0) { + if (currentBatchCount == 0) { + num = ++num % numPartitions; + } + if (++currentBatchCount == batchSize) { + currentBatchCount = 0; + } + } else { + num = ++num % numPartitions; + } return num; } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + batchSize = conf.getInt(PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, 0); + } + + @Override + public Configuration getConf() { + return conf; + } + } Modified: pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java Wed Feb 22 09:43:41 2017 @@ -37,7 +37,6 @@ import org.apache.pig.ResourceSchema.Res import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.bzip2r.Bzip2TextInputFormat; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; @@ -259,8 +258,7 @@ public class TextLoader extends LoadFunc @Override public InputFormat getInputFormat() { if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) - && !HadoopShims.isHadoopYARN() - && !bzipinput_usehadoops ) { + && !bzipinput_usehadoops ) { mLog.info("Using Bzip2TextInputFormat"); return new Bzip2TextInputFormat(); } else { Modified: pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java Wed Feb 22 09:43:41 2017 @@ -423,7 +423,7 @@ public abstract class DefaultAbstractBag } @SuppressWarnings("rawtypes") - protected void warn(String msg, Enum warningEnum, Exception e) { + protected void warn(String msg, Enum warningEnum, Throwable e) { pigLogger = PhysicalOperator.getPigLogger(); if(pigLogger != null) { pigLogger.warn(this, msg, warningEnum); Modified: pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java Wed Feb 22 09:43:41 2017 @@ -22,11 +22,11 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.io.FileNotFoundException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,12 +42,12 @@ import org.apache.pig.PigWarning; public class DefaultDataBag extends DefaultAbstractBag { /** - * + * */ private static final long serialVersionUID = 2L; private static final Log log = LogFactory.getLog(DefaultDataBag.class); - + private static final InterSedes SEDES = InterSedesFactory.getInterSedesInstance(); public DefaultDataBag() { @@ -70,12 +70,12 @@ public class DefaultDataBag extends Defa public boolean isSorted() { return false; } - + @Override public boolean isDistinct() { return false; } - + @Override public Iterator<Tuple> iterator() { return new DefaultDataBagIterator(); @@ -110,12 +110,15 @@ public class DefaultDataBag extends Defa if ((spilled & 0x3fff) == 0) reportProgress(); } out.flush(); - } catch (IOException ioe) { + out.close(); + out = null; + mContents.clear(); + } catch (Throwable e) { // Remove the last file from the spilled array, since we failed to // write to it. mSpillFiles.remove(mSpillFiles.size() - 1); warn( - "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe); + "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e); return 0; } finally { if (out != null) { @@ -126,7 +129,6 @@ public class DefaultDataBag extends Defa } } } - mContents.clear(); } // Increment the spill count incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT); @@ -156,7 +158,7 @@ public class DefaultDataBag extends Defa } @Override - public boolean hasNext() { + public boolean hasNext() { // Once we call hasNext(), set the flag, so we can call hasNext() repeated without fetching next tuple if (hasCachedTuple) return (mBuf != null); @@ -209,7 +211,7 @@ public class DefaultDataBag extends Defa } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should never // happen. - String msg = "Unable to find our spill file."; + String msg = "Unable to find our spill file."; log.fatal(msg, fnfe); throw new RuntimeException(msg, fnfe); } @@ -223,7 +225,7 @@ public class DefaultDataBag extends Defa log.fatal(msg, eof); throw new RuntimeException(msg, eof); } catch (IOException ioe) { - String msg = "Unable to read our spill file."; + String msg = "Unable to read our spill file."; log.fatal(msg, ioe); throw new RuntimeException(msg, ioe); } @@ -259,7 +261,7 @@ public class DefaultDataBag extends Defa log.warn("Failed to close spill file.", e); } } catch (IOException ioe) { - String msg = "Unable to read our spill file."; + String msg = "Unable to read our spill file."; log.fatal(msg, ioe); throw new RuntimeException(msg, ioe); } Modified: pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java Wed Feb 22 09:43:41 2017 @@ -67,17 +67,17 @@ public class DistinctDataBag extends Def public boolean isSorted() { return false; } - + @Override public boolean isDistinct() { return true; } - - + + @Override public long size() { if (mSpillFiles != null && mSpillFiles.size() > 0){ - //We need to racalculate size to guarantee a count of unique + //We need to racalculate size to guarantee a count of unique //entries including those on disk Iterator<Tuple> iter = iterator(); int newSize = 0; @@ -85,7 +85,7 @@ public class DistinctDataBag extends Def newSize++; iter.next(); } - + synchronized(mContents) { //we don't want adds to change our numbers //the lock may need to cover more of the method @@ -94,8 +94,8 @@ public class DistinctDataBag extends Def } return mSize; } - - + + @Override public Iterator<Tuple> iterator() { return new DistinctDataBagIterator(); @@ -155,12 +155,15 @@ public class DistinctDataBag extends Def } } out.flush(); - } catch (IOException ioe) { + out.close(); + out = null; + mContents.clear(); + } catch (Throwable e) { // Remove the last file from the spilled array, since we failed to // write to it. mSpillFiles.remove(mSpillFiles.size() - 1); warn( - "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe); + "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e); return 0; } finally { if (out != null) { @@ -171,7 +174,6 @@ public class DistinctDataBag extends Def } } } - mContents.clear(); } // Increment the spill count incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT); @@ -208,7 +210,7 @@ public class DistinctDataBag extends Def @Override public int hashCode() { - return tuple.hashCode(); + return tuple.hashCode(); } } @@ -237,7 +239,7 @@ public class DistinctDataBag extends Def } @Override - public boolean hasNext() { + public boolean hasNext() { // See if we can find a tuple. If so, buffer it. mBuf = next(); return mBuf != null; @@ -295,7 +297,7 @@ public class DistinctDataBag extends Def } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should never // happen. - String msg = "Unable to find our spill file."; + String msg = "Unable to find our spill file."; log.fatal(msg, fnfe); throw new RuntimeException(msg, fnfe); } @@ -346,7 +348,7 @@ public class DistinctDataBag extends Def Iterator<File> i = mSpillFiles.iterator(); while (i.hasNext()) { try { - DataInputStream in = + DataInputStream in = new DataInputStream(new BufferedInputStream( new FileInputStream(i.next()))); mStreams.add(in); @@ -502,7 +504,7 @@ public class DistinctDataBag extends Def addToQueue(null, mStreams.size() - 1); i.remove(); filesToDelete.add(f); - + } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should // neer happen. @@ -545,7 +547,7 @@ public class DistinctDataBag extends Def log.warn("Failed to delete spill file: " + f.getPath()); } } - + // clear the list, so that finalize does not delete any files, // when mSpillFiles is assigned a new value mSpillFiles.clear(); @@ -560,6 +562,6 @@ public class DistinctDataBag extends Def } } } - + } Modified: pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java Wed Feb 22 09:43:41 2017 @@ -50,6 +50,9 @@ public class ReadOnceBag implements Data */ private static final long serialVersionUID = 2L; + public ReadOnceBag() { + } + /** * This constructor creates a bag out of an existing iterator * of tuples by taking ownership of the iterator and NOT Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java (original) +++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java Wed Feb 22 09:43:41 2017 @@ -39,6 +39,7 @@ import org.apache.pig.data.utils.Structu import org.apache.pig.data.utils.StructuresHelper.Triple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.util.Utils; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -272,14 +273,20 @@ public class SchemaTupleBackend { private static SchemaTupleBackend stb; public static void initialize(Configuration jConf, PigContext pigContext) throws IOException { - initialize(jConf, pigContext, pigContext.getExecType().isLocal()); + if (stb != null) { + SchemaTupleFrontend.lazyReset(pigContext); + } + initialize(jConf, pigContext.getExecType().isLocal()); } - public static void initialize(Configuration jConf, PigContext pigContext, boolean isLocal) throws IOException { + public static void initialize(Configuration jConf) throws IOException { + initialize(jConf, Utils.isLocal(jConf)); + } + + public static void initialize(Configuration jConf, boolean isLocal) throws IOException { if (stb != null) { LOG.warn("SchemaTupleBackend has already been initialized"); } else { - SchemaTupleFrontend.lazyReset(pigContext); SchemaTupleFrontend.reset(); SchemaTupleBackend stbInstance = new SchemaTupleBackend(jConf, isLocal); stbInstance.copyAndResolve(); Modified: pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java Wed Feb 22 09:43:41 2017 @@ -32,7 +32,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.ListIterator; import java.util.PriorityQueue; - + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.pig.PigCounters; @@ -44,14 +44,14 @@ import org.apache.pig.PigWarning; * stored unsorted as it comes in, and only sorted when it is time to dump * it to a file or when the first iterator is requested. Experementation * found this to be the faster than storing it sorted to begin with. - * + * * We allow a user defined comparator, but provide a default comparator in * cases where the user doesn't specify one. */ public class SortedDataBag extends DefaultAbstractBag{ /** - * + * */ private static final long serialVersionUID = 2L; @@ -76,7 +76,7 @@ public class SortedDataBag extends Defau @Override public int hashCode() { - return 42; + return 42; } } @@ -95,12 +95,12 @@ public class SortedDataBag extends Defau public boolean isSorted() { return true; } - + @Override public boolean isDistinct() { return false; } - + @Override public Iterator<Tuple> iterator() { return new SortedDataBagIterator(); @@ -145,12 +145,15 @@ public class SortedDataBag extends Defau if ((spilled & 0x3fff) == 0) reportProgress(); } out.flush(); - } catch (IOException ioe) { + out.close(); + out = null; + mContents.clear(); + } catch (Throwable e) { // Remove the last file from the spilled array, since we failed to // write to it. mSpillFiles.remove(mSpillFiles.size() - 1); warn( - "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe); + "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e); return 0; } finally { if (out != null) { @@ -161,7 +164,6 @@ public class SortedDataBag extends Defau } } } - mContents.clear(); } // Increment the spill count incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT); @@ -203,7 +205,7 @@ public class SortedDataBag extends Defau @Override public int hashCode() { - return tuple.hashCode(); + return tuple.hashCode(); } } @@ -228,7 +230,7 @@ public class SortedDataBag extends Defau } @Override - public boolean hasNext() { + public boolean hasNext() { // See if we can find a tuple. If so, buffer it. mBuf = next(); return mBuf != null; @@ -341,7 +343,7 @@ public class SortedDataBag extends Defau Iterator<File> i = mSpillFiles.iterator(); while (i.hasNext()) { try { - DataInputStream in = + DataInputStream in = new DataInputStream(new BufferedInputStream( new FileInputStream(i.next()))); mStreams.add(in); @@ -351,7 +353,7 @@ public class SortedDataBag extends Defau } catch (FileNotFoundException fnfe) { // We can't find our own spill file? That should // never happen. - String msg = "Unable to find our spill file."; + String msg = "Unable to find our spill file."; log.fatal(msg, fnfe); throw new RuntimeException(msg, fnfe); } @@ -411,7 +413,7 @@ public class SortedDataBag extends Defau in.close(); }catch(IOException e) { log.warn("Failed to close spill file.", e); - } + } mStreams.set(fileNum, null); } catch (IOException ioe) { String msg = "Unable to find our spill file."; @@ -518,7 +520,7 @@ public class SortedDataBag extends Defau log.warn("Failed to delete spill file: " + f.getPath()); } } - + // clear the list, so that finalize does not delete any files, // when mSpillFiles is assigned a new value mSpillFiles.clear(); Modified: pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java (original) +++ pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java Wed Feb 22 09:43:41 2017 @@ -29,7 +29,7 @@ import org.apache.pig.classification.Int /** * Common functionality for proactively spilling bags that need to keep the data - * sorted. + * sorted. */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -54,9 +54,9 @@ public abstract class SortedSpillBag ext //count for number of objects that have spilled if(mSpillFiles == null) incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_BAGS); - + long spilled = 0; - + DataOutputStream out = null; try { out = getSpillFile(); @@ -71,13 +71,13 @@ public abstract class SortedSpillBag ext //sort the tuples // as per documentation of collection.sort(), it copies to an array, // sorts and copies back to collection - // Avoiding that extra copy back to collection (mContents) by + // Avoiding that extra copy back to collection (mContents) by // copying to an array and using Arrays.sort Tuple[] array = new Tuple[mContents.size()]; mContents.toArray(array); if(comp == null) Arrays.sort(array); - else + else Arrays.sort(array,comp); //dump the array @@ -89,12 +89,15 @@ public abstract class SortedSpillBag ext } out.flush(); - } catch (IOException ioe) { + out.close(); + out = null; + mContents.clear(); + } catch (Throwable e) { // Remove the last file from the spilled array, since we failed to // write to it. mSpillFiles.remove(mSpillFiles.size() - 1); warn( - "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe); + "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e); return 0; } finally { if (out != null) { @@ -105,11 +108,9 @@ public abstract class SortedSpillBag ext } } } - mContents.clear(); - incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_RECS, spilled); - + return spilled; } - + } Modified: pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java (original) +++ pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java Wed Feb 22 09:43:41 2017 @@ -28,7 +28,7 @@ public class UnlimitedNullTuple extends @Override public int size() { - throw new RuntimeException("Unimplemented"); + return Integer.MAX_VALUE; } @Override Modified: pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java (original) +++ pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java Wed Feb 22 09:43:41 2017 @@ -61,25 +61,25 @@ public class SedesHelper { public static void writeChararray(DataOutput out, String s) throws IOException { // a char can take up to 3 bytes in the modified utf8 encoding // used by DataOutput.writeUTF, so use UNSIGNED_SHORT_MAX/3 - if (s.length() < BinInterSedes.UNSIGNED_SHORT_MAX / 3) { + byte[] utfBytes = s.getBytes(BinInterSedes.UTF8); + int length = utfBytes.length; + if (length < BinInterSedes.UNSIGNED_SHORT_MAX) { out.writeByte(BinInterSedes.SMALLCHARARRAY); - out.writeUTF(s); + out.writeShort(length); } else { - byte[] utfBytes = s.getBytes(BinInterSedes.UTF8); - int length = utfBytes.length; - out.writeByte(BinInterSedes.CHARARRAY); out.writeInt(length); - out.write(utfBytes); } + out.write(utfBytes); } public static String readChararray(DataInput in, byte type) throws IOException { + int size; if (type == BinInterSedes.SMALLCHARARRAY) { - return in.readUTF(); + size = in.readUnsignedShort(); + } else { + size = in.readInt(); } - - int size = in.readInt(); byte[] buf = new byte[size]; in.readFully(buf); return new String(buf, BinInterSedes.UTF8); Modified: pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java Wed Feb 22 09:43:41 2017 @@ -38,6 +38,12 @@ public class PigImplConstants { public static final String PIG_OPTIMIZER_RULES_KEY = "pig.optimizer.rules"; /** + * Used by pig to indicate that current job is running in local mode (local/tez_local) + * ie. ExecType.isLocal() is true + */ + public static final String PIG_EXECTYPE_MODE_LOCAL = "pig.exectype.mode.local"; + + /** * Used by pig to indicate that current job has been converted to run in local mode */ public static final String CONVERTED_TO_LOCAL = "pig.job.converted.local"; @@ -63,4 +69,24 @@ public class PigImplConstants { * Parallelism to be used for CROSS operation by GFCross UDF */ public static final String PIG_CROSS_PARALLELISM = "pig.cross.parallelism"; + + /** + * Pig context + */ + public static final String PIG_CONTEXT = "pig.pigContext"; + + /** + * Pig log4j properties + */ + public static final String PIG_LOG4J_PROPERTIES = "pig.log4j.properties"; + + /** + * A unique id for a Pig session used as callerId for underlining component + */ + public static final String PIG_AUDIT_ID = "pig.script.id"; + + // Kill the jobs before cleaning up tmp files + public static int SHUTDOWN_HOOK_JOB_KILL_PRIORITY = 3; + public static int SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY = 2; + public static int SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY = 1; } Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Wed Feb 22 09:43:41 2017 @@ -64,13 +64,13 @@ import org.apache.pig.impl.util.ObjectSe public class DefaultIndexableLoader extends LoadFunc implements IndexableLoadFunc{ private static final Log LOG = LogFactory.getLog(DefaultIndexableLoader.class); - + // FileSpec of index file which will be read from HDFS. private String indexFile; private String indexFileLoadFuncSpec; - + private LoadFunc loader; - // Index is modeled as FIFO queue and LinkedList implements java Queue interface. + // Index is modeled as FIFO queue and LinkedList implements java Queue interface. private LinkedList<Tuple> index; private FuncSpec rightLoaderFuncSpec; @@ -79,9 +79,9 @@ public class DefaultIndexableLoader exte private transient TupleFactory mTupleFactory; private String inpLocation; - + public DefaultIndexableLoader( - String loaderFuncSpec, + String loaderFuncSpec, String indexFile, String indexFileLoadFuncSpec, String scope, @@ -93,39 +93,39 @@ public class DefaultIndexableLoader exte this.scope = scope; this.inpLocation = inputLocation; } - + @SuppressWarnings("unchecked") @Override public void seekNear(Tuple keys) throws IOException{ // some setup mTupleFactory = TupleFactory.getInstance(); - /* Currently whole of index is read into memory. Typically, index is small. Usually + /* Currently whole of index is read into memory. Typically, index is small. Usually few KBs in size. So, this should not be an issue. However, reading whole index at startup time is not required. So, this can be improved upon. Assumption: Index being read is sorted on keys followed by filename, followed by offset. */ // Index is modeled as FIFO Queue, that frees us from keeping track of which index entry should be read next. - + // the keys are sent in a tuple. If there is really only // 1 join key, it would be the first field of the tuple. If // there are multiple Join keys, the tuple itself represents // the join key Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys); POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new FuncSpec(indexFileLoadFuncSpec))); - + Properties props = ConfigurationUtil.getLocalFSProperties(); PigContext pc = new PigContext(ExecType.LOCAL, props); ld.setPc(pc); index = new LinkedList<Tuple>(); for(Result res=ld.getNextTuple();res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNextTuple()) - index.offer((Tuple) res.result); + index.offer((Tuple) res.result); + - Tuple prevIdxEntry = null; Tuple matchedEntry; - + // When the first call is made, we need to seek into right input at correct offset. while(true){ // Keep looping till we find first entry in index >= left key @@ -148,15 +148,15 @@ public class DefaultIndexableLoader exte prevIdxEntry = curIdxEntry; continue; } - + if(((Comparable)extractedKey).compareTo(firstLeftKey) >= 0){ index.addFirst(curIdxEntry); // We need to add back the current index Entry because we are reading ahead. if(null == prevIdxEntry) // very first entry in index. matchedEntry = curIdxEntry; else{ - matchedEntry = prevIdxEntry; + matchedEntry = prevIdxEntry; // start join from previous idx entry, it might have tuples - // with this key + // with this key index.addFirst(prevIdxEntry); } break; @@ -168,43 +168,43 @@ public class DefaultIndexableLoader exte if (matchedEntry == null) { LOG.warn("Empty index file: input directory is empty"); } else { - + Object extractedKey = extractKeysFromIdxTuple(matchedEntry); - + if (extractedKey != null) { Class idxKeyClass = extractedKey.getClass(); if( ! firstLeftKey.getClass().equals(idxKeyClass)){ - + // This check should indeed be done on compile time. But to be on safe side, we do it on runtime also. int errCode = 2166; String errMsg = "Key type mismatch. Found key of type "+firstLeftKey.getClass().getCanonicalName()+" on left side. But, found key of type "+ idxKeyClass.getCanonicalName()+" in index built for right side."; throw new ExecException(errMsg,errCode,PigException.BUG); } - } + } } - + //add remaining split indexes to splitsAhead array int [] splitsAhead = new int[index.size()]; int splitsAheadIdx = 0; for(Tuple t : index){ splitsAhead[splitsAheadIdx++] = (Integer) t.get( t.size()-1 ); } - + initRightLoader(splitsAhead); } - + private void initRightLoader(int [] splitsToBeRead) throws IOException{ - PigContext pc = (PigContext) ObjectSerializer - .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.pigContext")); - - Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties()); - + Properties properties = (Properties) ObjectSerializer + .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.client.sys.props")); + + Configuration conf = ConfigurationUtil.toConfiguration(properties); + // Hadoop security need this property to be set if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { - conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY, + conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY, System.getenv("HADOOP_TOKEN_FILE_LOCATION")); } - + //create ReadToEndLoader that will read the given splits in order loader = new ReadToEndLoader((LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec), conf, inpLocation, splitsToBeRead); @@ -216,7 +216,7 @@ public class DefaultIndexableLoader exte if(idxTupSize == 3) return idxTuple.get(0); - + int numColsInKey = (idxTupSize - 2); List<Object> list = new ArrayList<Object>(numColsInKey); for(int i=0; i < numColsInKey; i++) @@ -228,13 +228,13 @@ public class DefaultIndexableLoader exte private OperatorKey genKey(){ return new OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope)); } - + @Override public Tuple getNext() throws IOException { Tuple t = loader.getNext(); return t; } - + @Override public void close() throws IOException { } @@ -242,14 +242,14 @@ public class DefaultIndexableLoader exte @Override public void initialize(Configuration conf) throws IOException { // nothing to do - + } @Override public InputFormat getInputFormat() throws IOException { throw new UnsupportedOperationException(); } - + @Override public LoadCaster getLoadCaster() throws IOException { throw new UnsupportedOperationException(); @@ -264,7 +264,7 @@ public class DefaultIndexableLoader exte public void setLocation(String location, Job job) throws IOException { // nothing to do } - + public void setIndexFile(String indexFile) { this.indexFile = indexFile; } Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java Wed Feb 22 09:43:41 2017 @@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.pig.EvalFunc; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; @@ -42,7 +43,7 @@ public class GFCross extends EvalFunc<Da private BagFactory mBagFactory = BagFactory.getInstance(); private TupleFactory mTupleFactory = TupleFactory.getInstance(); private int parallelism = 0; - private Random r = new Random(); + private Random r; private String crossKey; static private final int DEFAULT_PARALLELISM = 96; @@ -69,6 +70,14 @@ public class GFCross extends EvalFunc<Da if (parallelism < 0) { throw new IOException(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey + " was " + parallelism); } + long taskIdHashCode = cfg.get(MRConfiguration.TASK_ID).hashCode(); + long seed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL); + r = new Random(seed); + } else { + // Don't see a case where cfg can be null. + // But there is an existing testcase TestGFCross.testDefault + // Using constant generated from task_14738102975522_0001_r_000000 hashcode + r = new Random(-4235927512599300514L); } numInputs = (Integer)input.get(0); Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Wed Feb 22 09:43:41 2017 @@ -90,7 +90,9 @@ public class PoissonSampleLoader extends // number of tuples to be skipped Tuple t = loader.getNext(); if(t == null) { - return createNumRowTuple(null); + // since skipInterval is -1, no previous sample, + // and next sample is null -> the data set is empty + return null; } long availRedMem = (long) ( totalMemory * heapPerc); // availRedMem = 155084396; Modified: pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java Wed Feb 22 09:43:41 2017 @@ -57,6 +57,8 @@ public class NullableTuple extends PigNu public void readFields(DataInput in) throws IOException { boolean nullness = in.readBoolean(); setNull(nullness); + // Free up the previous value for GC + mValue = null; if (!nullness) { mValue = bis.readTuple(in); } Modified: pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java Wed Feb 22 09:43:41 2017 @@ -102,7 +102,7 @@ public class PigFile { if(oc.needsTaskCommit(tac)) { oc.commitTask(tac); } - HadoopShims.commitOrCleanup(oc, jc); + oc.commitJob(jc); } @Override Modified: pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java Wed Feb 22 09:43:41 2017 @@ -40,17 +40,16 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.data.SchemaTupleBackend; import org.apache.pig.data.Tuple; -import org.apache.pig.impl.PigContext; import org.apache.pig.impl.plan.OperatorKey; /** * This is wrapper Loader which wraps a real LoadFunc underneath and allows - * to read a file completely starting a given split (indicated by a split index + * to read a file completely starting a given split (indicated by a split index * which is used to look in the List<InputSplit> returned by the underlying * InputFormat's getSplits() method). So if the supplied split index is 0, this * loader will read the entire file. If it is non zero it will read the partial * file beginning from that split to the last split. - * + * * The call sequence to use this is: * 1) construct an object using the constructor * 2) Call getNext() in a loop till it returns null @@ -61,52 +60,50 @@ public class ReadToEndLoader extends Loa * the wrapped LoadFunc which will do the actual reading */ private LoadFunc wrappedLoadFunc; - + /** * the Configuration object used to locate the input location - this will * be used to call {@link LoadFunc#setLocation(String, Job)} on * the wrappedLoadFunc */ private Configuration conf; - + /** * the input location string (typically input file/dir name ) */ private String inputLocation; - + /** * If the splits to be read are not in increasing sequence of integers * this array can be used */ private int[] toReadSplits = null; - + /** * index into toReadSplits */ private int toReadSplitsIdx = 0; - + /** * the index of the split the loader is currently reading from */ private int curSplitIndex; - + /** * the input splits returned by underlying {@link InputFormat#getSplits(JobContext)} */ private List<InputSplit> inpSplits = null; - + /** * underlying RecordReader */ private RecordReader reader = null; - + /** * underlying InputFormat */ private InputFormat inputFormat = null; - - private PigContext pigContext; - + private String udfContextSignature = null; /** @@ -114,8 +111,8 @@ public class ReadToEndLoader extends Loa * @param conf * @param inputLocation * @param splitIndex - * @throws IOException - * @throws InterruptedException + * @throws IOException + * @throws InterruptedException */ public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf, String inputLocation, int splitIndex) throws IOException { @@ -125,17 +122,7 @@ public class ReadToEndLoader extends Loa this.curSplitIndex = splitIndex; init(); } - - public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf, - String inputLocation, int splitIndex, PigContext pigContext) throws IOException { - this.wrappedLoadFunc = wrappedLoadFunc; - this.inputLocation = inputLocation; - this.conf = conf; - this.curSplitIndex = splitIndex; - this.pigContext = pigContext; - init(); - } - + public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf, String inputLocation, int splitIndex, String signature) throws IOException { this.udfContextSignature = signature; @@ -147,14 +134,14 @@ public class ReadToEndLoader extends Loa } /** - * This constructor takes an array of split indexes (toReadSplitIdxs) of the + * This constructor takes an array of split indexes (toReadSplitIdxs) of the * splits to be read. * @param wrappedLoadFunc * @param conf * @param inputLocation * @param toReadSplitIdxs - * @throws IOException - * @throws InterruptedException + * @throws IOException + * @throws InterruptedException */ public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf, String inputLocation, int[] toReadSplitIdxs) throws IOException { @@ -166,21 +153,21 @@ public class ReadToEndLoader extends Loa toReadSplitIdxs.length > 0 ? toReadSplitIdxs[0] : Integer.MAX_VALUE; init(); } - + @SuppressWarnings("unchecked") private void init() throws IOException { - if (conf != null && pigContext != null) { - SchemaTupleBackend.initialize(conf, pigContext, true); + if (conf != null) { + SchemaTupleBackend.initialize(conf, true); } // make a copy so that if the underlying InputFormat writes to the // conf, we don't affect the caller's copy conf = new Configuration(conf); - // let's initialize the wrappedLoadFunc + // let's initialize the wrappedLoadFunc Job job = new Job(conf); wrappedLoadFunc.setUDFContextSignature(this.udfContextSignature); - wrappedLoadFunc.setLocation(inputLocation, + wrappedLoadFunc.setLocation(inputLocation, job); // The above setLocation call could write to the conf within // the job - get a hold of the modified conf @@ -191,10 +178,10 @@ public class ReadToEndLoader extends Loa new JobID())); } catch (InterruptedException e) { throw new IOException(e); - } + } } - private boolean initializeReader() throws IOException, + private boolean initializeReader() throws IOException, InterruptedException { // Close the previous reader first if(reader != null) { @@ -206,14 +193,14 @@ public class ReadToEndLoader extends Loa return false; } InputSplit curSplit = inpSplits.get(curSplitIndex); - TaskAttemptContext tAContext = HadoopShims.createTaskAttemptContext(conf, + TaskAttemptContext tAContext = HadoopShims.createTaskAttemptContext(conf, new TaskAttemptID()); reader = inputFormat.createRecordReader(curSplit, tAContext); reader.initialize(curSplit, tAContext); // create a dummy pigsplit - other than the actual split, the other // params are really not needed here where we are just reading the // input completely - PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1, + PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1, new ArrayList<OperatorKey>(), -1); // Set the conf object so that if the wrappedLoadFunc uses it, // it won't be null @@ -244,7 +231,7 @@ public class ReadToEndLoader extends Loa throw new IOException(e); } } - + private Tuple getNextHelper() throws IOException, InterruptedException { Tuple t = null; while(initializeReader()) { @@ -258,8 +245,8 @@ public class ReadToEndLoader extends Loa } return null; } - - + + /** * Updates curSplitIndex , just increment if splitIndexes is null, * else get next split in splitIndexes @@ -331,7 +318,7 @@ public class ReadToEndLoader extends Loa ((LoadMetadata) wrappedLoadFunc).setPartitionFilter(partitionFilter); } } - + @Override public void setUDFContextSignature(String signature) { this.udfContextSignature = signature; Modified: pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java Wed Feb 22 09:43:41 2017 @@ -20,43 +20,78 @@ package org.apache.pig.impl.plan; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import com.google.common.annotations.VisibleForTesting; +/** + * Generates IDs as long values in a thread safe manner. Each thread has its own generated IDs. + */ public class NodeIdGenerator { - private Map<String, Long> scopeToIdMap; - private static NodeIdGenerator theGenerator = new NodeIdGenerator(); - - private NodeIdGenerator() { - scopeToIdMap = new HashMap<String, Long>(); - } - + /** + * Holds a map of generated scoped-IDs per thread. Each map holds generated IDs per scope. + */ + private ThreadLocal<Map<String, AtomicLong>> scopeToIdMap + = new ThreadLocal<Map<String, AtomicLong>>() { + protected Map<String, AtomicLong> initialValue() { + return new HashMap<String,AtomicLong>(); + } + }; + + /** + * Singleton instance. + */ + private static final NodeIdGenerator theGenerator = new NodeIdGenerator(); + + /** + * Private default constructor to force singleton use-case of this class. + */ + private NodeIdGenerator() {} + + /** + * Returns the NodeIdGenerator singleton. + * @return + */ public static NodeIdGenerator getGenerator() { return theGenerator; } - public long getNextNodeId(String scope) { - Long val = scopeToIdMap.get(scope); - - long nextId = 0; - - if (val != null) { - nextId = val.longValue(); - } - - scopeToIdMap.put(scope, nextId + 1); - - return nextId; + /** + * Returns the next ID to be used for the current Thread. + * + * @param scope + * @return + */ + public long getNextNodeId(final String scope) { + // ThreadLocal usage protects us from having the same HashMap instance + // being used by several threads, so we can use it without synchronized + // blocks and still be thread-safe. + Map<String, AtomicLong> map = scopeToIdMap.get(); + + // the concurrent properties of the AtomicLong are useless here but + // since it cost less to use such an object rather than created a + // Long object instance each time we increment a counter ... + AtomicLong l = map.get(scope); + if ( l == null ) + map.put( scope, l = new AtomicLong() ); + return l.getAndIncrement(); } + /** + * Reset the given scope IDs to 0 for the current Thread. + * @param scope + */ @VisibleForTesting - public static void reset(String scope) { - theGenerator.scopeToIdMap.put(scope, 0L) ; + public static void reset(final String scope) { + theGenerator.scopeToIdMap.get().remove(scope); } + /** + * Reset all scope IDs to 0 for the current Thread. + */ @VisibleForTesting public static void reset() { - theGenerator.scopeToIdMap.clear(); + theGenerator.scopeToIdMap.remove(); } } Modified: pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java Wed Feb 22 09:43:41 2017 @@ -150,12 +150,13 @@ public class ExecutableManager { LOG.debug("Process exited with: " + exitCode); if (exitCode != SUCCESS) { - LOG.error(command + " failed with exit status: " - + exitCode); + String errMsg = "'" + command.toString() + "'" + " failed with exit status: " + exitCode; + LOG.error(errMsg); + Result res = new Result(POStatus.STATUS_ERR, errMsg); + sendOutput(poStream.getBinaryOutputQueue(), res); } - if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) { - + if (exitCode == SUCCESS && outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) { // Trigger the outputHandler outputHandler.bindTo("", null, 0, -1); @@ -178,10 +179,18 @@ public class ExecutableManager { * @param process the process to be killed * @throws IOException */ - private void killProcess(Process process) throws IOException { + private void killProcess(Process process) { if (process != null) { - inputHandler.close(process); - outputHandler.close(); + try { + inputHandler.close(process); + } catch (Exception e) { + LOG.info("Exception in killProcess while closing inputHandler. Ignoring:" + e.getMessage()); + } + try { + outputHandler.close(); + } catch (Exception e) { + LOG.info("Exception in killProcess while closing outputHandler. Ignoring:" + e.getMessage()); + } process.destroy(); } } @@ -334,7 +343,7 @@ public class ExecutableManager { // we will only call close() here and not // worry about deducing whether the process died // normally or abnormally - if there was any real - // issue the ProcessOutputThread should see + // issue we should see // a non zero exit code from the process and send // a POStatus.STATUS_ERR back - what if we got // an IOException because there was only an issue with @@ -344,14 +353,6 @@ public class ExecutableManager { return; } else { // asynchronous case - then this is a real exception - LOG.error("Exception while trying to write to stream binary's input", e); - // send POStatus.STATUS_ERR to POStream to signal the error - // Generally the ProcessOutputThread would do this but now - // we should do it here since neither the process nor the - // ProcessOutputThread will ever be spawned - Result res = new Result(POStatus.STATUS_ERR, - "Exception while trying to write to stream binary's input" + e.getMessage()); - sendOutput(poStream.getBinaryOutputQueue(), res); throw e; } } @@ -362,13 +363,13 @@ public class ExecutableManager { } catch (Throwable t) { // Note that an error occurred outerrThreadsError = t; - LOG.error( "Error while reading from POStream and " + - "passing it to the streaming process", t); - try { - killProcess(process); - } catch (IOException ioe) { - LOG.warn(ioe); - } + Result res = new Result(POStatus.STATUS_ERR, + "Error while reading from POStream and " + + "passing it to the streaming process:" + t.getMessage()); + LOG.error("Error while reading from POStream and " + + "passing it to the streaming process:", t); + sendOutput(poStream.getBinaryOutputQueue(), res); + killProcess(process); } } } @@ -452,13 +453,7 @@ public class ExecutableManager { try { exitCode = process.waitFor(); } catch (InterruptedException ie) { - try { - killProcess(process); - } catch (IOException e) { - LOG.warn("Exception trying to kill process while processing null output " + - "from binary", e); - - } + killProcess(process); // signal error String errMsg = "Failure while waiting for process (" + command.toString() + ")" + ie.getMessage(); Modified: pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java Wed Feb 22 09:43:41 2017 @@ -175,8 +175,10 @@ public abstract class OutputHandler { */ public synchronized void close() throws IOException { if(!alreadyClosed) { - istream.close(); - istream = null; + if( istream != null ) { + istream.close(); + istream = null; + } alreadyClosed = true; } } Modified: pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java (original) +++ pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java Wed Feb 22 09:43:41 2017 @@ -47,7 +47,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.impl.PigContext; import org.apache.tools.bzip2r.BZip2Constants; import org.joda.time.DateTime; @@ -66,7 +65,6 @@ public class JarManager { BZIP2R(BZip2Constants.class), AUTOMATON(Automaton.class), ANTLR(CommonTokenStream.class), - GUAVA(Multimaps.class), JODATIME(DateTime.class); private final Class pkgClass; @@ -208,11 +206,8 @@ public class JarManager { public static List<String> getDefaultJars() { List<String> defaultJars = new ArrayList<String>(); for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) { - if(pkgToSend.equals(DefaultPigPackages.GUAVA) && HadoopShims.isHadoopYARN()) { - continue; //Skip - } String jar = findContainingJar(pkgToSend.getPkgClass()); - if (!defaultJars.contains(jar)) { + if (jar != null && !defaultJars.contains(jar)) { defaultJars.add(jar); } }
