Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Sat Jan 7 00:11:37 2017 @@ -267,7 +267,7 @@ public class PigInputFormat extends Inpu jobcontext.getJobID())); List<InputSplit> oneInputPigSplits = getPigSplits( oneInputSplits, i, inpTargets.get(i), - HadoopShims.getDefaultBlockSize(fs, isFsPath? path: fs.getWorkingDirectory()), + fs.getDefaultBlockSize(isFsPath? path: fs.getWorkingDirectory()), combinable, confClone); splits.addAll(oneInputPigSplits); } catch (ExecException ee) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Sat Jan 7 00:11:37 2017 @@ -18,7 +18,6 @@ package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; @@ -156,12 +155,7 @@ public class PigOutputCommitter extends for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) { if (mapCommitter.first!=null) { try { - // Use reflection, Hadoop 1.x line does not have such method - Method m = mapCommitter.first.getClass().getMethod("isRecoverySupported"); - allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery - && (Boolean)m.invoke(mapCommitter.first); - } catch (NoSuchMethodException e) { - allOutputCommitterSupportRecovery = false; + allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && mapCommitter.first.isRecoverySupported(); } catch (Exception e) { throw new RuntimeException(e); } @@ -173,12 +167,7 @@ public class PigOutputCommitter extends reduceOutputCommitters) { if (reduceCommitter.first!=null) { try { - // Use reflection, Hadoop 1.x line does not have such method - Method m = reduceCommitter.first.getClass().getMethod("isRecoverySupported"); - allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery - && (Boolean)m.invoke(reduceCommitter.first); - } catch (NoSuchMethodException e) { - allOutputCommitterSupportRecovery = false; + allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && reduceCommitter.first.isRecoverySupported(); } catch (Exception e) { throw new RuntimeException(e); } @@ -197,10 +186,7 @@ public class PigOutputCommitter extends mapCommitter.second); try { // Use reflection, Hadoop 1.x line does not have such method - Method m = mapCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class); - m.invoke(mapCommitter.first, updatedContext); - } catch (NoSuchMethodException e) { - // We are using Hadoop 1.x, ignore + mapCommitter.first.recoverTask(updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -212,11 +198,7 @@ public class PigOutputCommitter extends TaskAttemptContext updatedContext = setUpContext(context, reduceCommitter.second); try { - // Use reflection, Hadoop 1.x line does not have such method - Method m = reduceCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class); - m.invoke(reduceCommitter.first, updatedContext); - } catch (NoSuchMethodException e) { - // We are using Hadoop 1.x, ignore + reduceCommitter.first.recoverTask(updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -256,10 +238,7 @@ public class PigOutputCommitter extends mapCommitter.second); // PIG-2642 promote files before calling storeCleanup/storeSchema try { - // Use reflection, 20.2 does not have such method - Method m = mapCommitter.first.getClass().getMethod("commitJob", JobContext.class); - m.setAccessible(true); - m.invoke(mapCommitter.first, updatedContext); + mapCommitter.first.commitJob(updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -273,10 +252,7 @@ public class PigOutputCommitter extends reduceCommitter.second); // PIG-2642 promote files before calling storeCleanup/storeSchema try { - // Use reflection, 20.2 does not have such method - Method m = reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class); - m.setAccessible(true); - m.invoke(reduceCommitter.first, updatedContext); + reduceCommitter.first.commitJob(updatedContext); } catch (Exception e) { throw new IOException(e); } @@ -293,10 +269,7 @@ public class PigOutputCommitter extends JobContext updatedContext = setUpContext(context, mapCommitter.second); try { - // Use reflection, 20.2 does not have such method - Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class); - m.setAccessible(true); - m.invoke(mapCommitter.first, updatedContext, state); + mapCommitter.first.abortJob(updatedContext, state); } catch (Exception e) { throw new IOException(e); } @@ -309,10 +282,7 @@ public class PigOutputCommitter extends JobContext updatedContext = setUpContext(context, reduceCommitter.second); try { - // Use reflection, 20.2 does not have such method - Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class); - m.setAccessible(true); - m.invoke(reduceCommitter.first, updatedContext, state); + reduceCommitter.first.abortJob(updatedContext, state); } catch (Exception e) { throw new IOException(e); } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Sat Jan 7 00:11:37 2017 @@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.executionengine.tez; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -43,6 +42,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -88,7 +88,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; @@ -285,7 +284,7 @@ public class TezDagBuilder extends TezOp try { fs = FileSystem.get(globalConf); - intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, FileLocalizer.getTemporaryResourcePath(pc)); + intermediateTaskInputSize = fs.getDefaultBlockSize(FileLocalizer.getTemporaryResourcePath(pc)); } catch (Exception e) { log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e); intermediateTaskInputSize = 134217728L; @@ -1428,22 +1427,12 @@ public class TezDagBuilder extends TezOp private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) { // the OutputFormat we report to Hadoop is always PigOutputFormat which - // can be wrapped with LazyOutputFormat provided if it is supported by - // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set + // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) { - try { - Class<?> clazz = PigContext - .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat"); - Method method = clazz.getMethod("setOutputFormatClass", - org.apache.hadoop.mapreduce.Job.class, Class.class); - method.invoke(null, job, PigOutputFormatTez.class); - } catch (Exception e) { - job.setOutputFormatClass(PigOutputFormatTez.class); - log.warn(PigConfiguration.PIG_OUTPUT_LAZY - + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used"); - } + LazyOutputFormat.setOutputFormatClass(job,PigOutputFormatTez.class); } else { job.setOutputFormatClass(PigOutputFormatTez.class); } } + } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sat Jan 7 00:11:37 2017 @@ -18,7 +18,6 @@ package org.apache.pig.backend.hadoop.hb import java.io.IOException; import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.lang.reflect.UndeclaredThrowableException; import java.math.BigDecimal; import java.math.BigInteger; @@ -65,6 +64,7 @@ import org.apache.hadoop.hbase.mapreduce import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableSplit; +import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; @@ -86,7 +86,6 @@ import org.apache.pig.ResourceSchema.Res import org.apache.pig.StoreFuncInterface; import org.apache.pig.StoreResources; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder; import org.apache.pig.builtin.FuncUtils; import org.apache.pig.builtin.Utf8StorageConverter; @@ -787,46 +786,35 @@ public class HBaseStorage extends LoadFu public List<String> getShipFiles() { // Depend on HBase to do the right thing when available, as of HBASE-9165 try { - Method addHBaseDependencyJars = - TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class); - if (addHBaseDependencyJars != null) { - Configuration conf = new Configuration(); - addHBaseDependencyJars.invoke(null, conf); - if (conf.get("tmpjars") != null) { - String[] tmpjars = conf.getStrings("tmpjars"); - List<String> shipFiles = new ArrayList<String>(tmpjars.length); - for (String tmpjar : tmpjars) { - shipFiles.add(new URL(tmpjar).getPath()); - } - return shipFiles; + Configuration conf = new Configuration(); + TableMapReduceUtil.addHBaseDependencyJars(conf); + if (conf.get("tmpjars") != null) { + String[] tmpjars = conf.getStrings("tmpjars"); + List<String> shipFiles = new ArrayList<String>(tmpjars.length); + for (String tmpjar : tmpjars) { + shipFiles.add(new URL(tmpjar).getPath()); } + return shipFiles; + } + } catch (IOException e) { + if(e instanceof MalformedURLException){ + LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars" + + " had malformed url. Falling back to previous logic.", e); + }else { + LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation" + + " failed. Falling back to previous logic.", e); } - } catch (NoSuchMethodException e) { - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available." - + " Falling back to previous logic.", e); - } catch (IllegalAccessException e) { - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation" - + " not permitted. Falling back to previous logic.", e); - } catch (InvocationTargetException e) { - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation" - + " failed. Falling back to previous logic.", e); - } catch (MalformedURLException e) { - LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars" - + " had malformed url. Falling back to previous logic.", e); } List<Class> classList = new ArrayList<Class>(); classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server - if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 0.23/2 itself has guava - classList.add(com.google.common.collect.Lists.class); // guava - } classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper // Additional jars that are specific to v0.95.0+ addClassToList("org.cloudera.htrace.Trace", classList); // htrace addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common - addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar + addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compat addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty return FuncUtils.getShipFiles(classList); } @@ -877,27 +865,13 @@ public class HBaseStorage extends LoadFu } if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) { - // Will not be entering this block for 0.20.2 as it has no security. try { - // getCurrentUser method is not public in 0.20.2 - Method m1 = UserGroupInformation.class.getMethod("getCurrentUser"); - UserGroupInformation currentUser = (UserGroupInformation) m1.invoke(null,(Object[]) null); - // hasKerberosCredentials method not available in 0.20.2 - Method m2 = UserGroupInformation.class.getMethod("hasKerberosCredentials"); - boolean hasKerberosCredentials = (Boolean) m2.invoke(currentUser, (Object[]) null); - if (hasKerberosCredentials) { - // Class and method are available only from 0.92 security release - Class tokenUtilClass = Class - .forName("org.apache.hadoop.hbase.security.token.TokenUtil"); - Method m3 = tokenUtilClass.getMethod("obtainTokenForJob", new Class[] { - Configuration.class, UserGroupInformation.class, Job.class }); - m3.invoke(null, new Object[] { hbaseConf, currentUser, job }); + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + if (currentUser.hasKerberosCredentials()) { + TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job); } else { LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available"); } - } catch (ClassNotFoundException cnfe) { - throw new RuntimeException("Failure loading TokenUtil class, " - + "is secure RPC available?", cnfe); } catch (RuntimeException re) { throw re; } catch (Exception e) { Modified: pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java (original) +++ pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java Sat Jan 7 00:11:37 2017 @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.shims.Hadoop23Shims; import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.Counters; @@ -180,20 +181,9 @@ abstract class HiveUDFBase extends EvalF @Override public List<String> getShipFiles() { - String hadoopVersion = "20S"; - if (Utils.isHadoop23() || Utils.isHadoop2()) { - hadoopVersion = "23"; - } - Class hadoopVersionShimsClass; - try { - hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" + - hadoopVersion + "Shims"); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath"); - } List<String> files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class, - PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class, - hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class}); + PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class, + Hadoop23Shims.class, HadoopShimsSecure.class, Collector.class}); return files; } Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original) +++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Sat Jan 7 00:11:37 2017 @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.serde2.obj import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.shims.Hadoop23Shims; import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; @@ -389,20 +390,8 @@ public class OrcStorage extends LoadFunc @Override public List<String> getShipFiles() { - List<String> cacheFiles = new ArrayList<String>(); - String hadoopVersion = "20S"; - if (Utils.isHadoop23() || Utils.isHadoop2()) { - hadoopVersion = "23"; - } - Class hadoopVersionShimsClass; - try { - hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" + - hadoopVersion + "Shims"); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath"); - } Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class, - org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass, + org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, Hadoop23Shims.class, Input.class}; return FuncUtils.getShipFiles(classList); } Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original) +++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Sat Jan 7 00:11:37 2017 @@ -68,7 +68,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.bzip2r.Bzip2TextInputFormat; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; @@ -412,7 +411,7 @@ LoadPushDown, LoadMetadata, StoreMetadat @Override public InputFormat getInputFormat() { if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) - && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) { + && (!bzipinput_usehadoops) ) { mLog.info("Using Bzip2TextInputFormat"); return new Bzip2TextInputFormat(); } else { Modified: pig/trunk/src/org/apache/pig/builtin/TextLoader.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TextLoader.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/builtin/TextLoader.java (original) +++ pig/trunk/src/org/apache/pig/builtin/TextLoader.java Sat Jan 7 00:11:37 2017 @@ -37,7 +37,6 @@ import org.apache.pig.ResourceSchema.Res import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.bzip2r.Bzip2TextInputFormat; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataByteArray; @@ -259,8 +258,7 @@ public class TextLoader extends LoadFunc @Override public InputFormat getInputFormat() { if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) - && !HadoopShims.isHadoopYARN() - && !bzipinput_usehadoops ) { + && !bzipinput_usehadoops ) { mLog.info("Using Bzip2TextInputFormat"); return new Bzip2TextInputFormat(); } else { Modified: pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original) +++ pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Sat Jan 7 00:11:37 2017 @@ -90,7 +90,9 @@ public class PoissonSampleLoader extends // number of tuples to be skipped Tuple t = loader.getNext(); if(t == null) { - return createNumRowTuple(null); + // since skipInterval is -1, no previous sample, + // and next sample is null -> the data set is empty + return null; } long availRedMem = (long) ( totalMemory * heapPerc); // availRedMem = 155084396; Modified: pig/trunk/src/org/apache/pig/impl/io/PigFile.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/PigFile.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/io/PigFile.java (original) +++ pig/trunk/src/org/apache/pig/impl/io/PigFile.java Sat Jan 7 00:11:37 2017 @@ -102,7 +102,7 @@ public class PigFile { if(oc.needsTaskCommit(tac)) { oc.commitTask(tac); } - HadoopShims.commitOrCleanup(oc, jc); + oc.commitJob(jc); } @Override Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original) +++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Sat Jan 7 00:11:37 2017 @@ -47,7 +47,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.StringUtils; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.impl.PigContext; import org.apache.tools.bzip2r.BZip2Constants; import org.joda.time.DateTime; @@ -66,7 +65,6 @@ public class JarManager { BZIP2R(BZip2Constants.class), AUTOMATON(Automaton.class), ANTLR(CommonTokenStream.class), - GUAVA(Multimaps.class), JODATIME(DateTime.class); private final Class pkgClass; @@ -208,9 +206,6 @@ public class JarManager { public static List<String> getDefaultJars() { List<String> defaultJars = new ArrayList<String>(); for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) { - if(pkgToSend.equals(DefaultPigPackages.GUAVA) && HadoopShims.isHadoopYARN()) { - continue; //Skip - } String jar = findContainingJar(pkgToSend.getPkgClass()); if (!defaultJars.contains(jar)) { defaultJars.add(jar); Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original) +++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Sat Jan 7 00:11:37 2017 @@ -94,20 +94,6 @@ public class Utils { return System.getProperty("java.vendor").contains("IBM"); } - public static boolean isHadoop23() { - String version = org.apache.hadoop.util.VersionInfo.getVersion(); - if (version.matches("\\b0\\.23\\..+\\b")) - return true; - return false; - } - - public static boolean isHadoop2() { - String version = org.apache.hadoop.util.VersionInfo.getVersion(); - if (version.matches("\\b2\\.\\d+\\..+")) - return true; - return false; - } - public static boolean is64bitJVM() { String arch = System.getProperties().getProperty("sun.arch.data.model", System.getProperty("com.ibm.vm.bitmode")); Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Sat Jan 7 00:11:37 2017 @@ -24,7 +24,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; +import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats; /** @@ -71,7 +71,7 @@ public class PigStatsUtil { */ @Deprecated public static final String FS_COUNTER_GROUP - = HadoopShims.getFsCounterGroupName(); + = MRPigStatsUtil.FS_COUNTER_GROUP; /** * Returns an empty PigStats object Use of this method is not advised as it Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Sat Jan 7 00:11:37 2017 @@ -32,15 +32,16 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.TaskReport; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.TaskReport; import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.pig.PigConfiguration; import org.apache.pig.PigCounters; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.classification.InterfaceAudience; import org.apache.pig.classification.InterfaceStability; import org.apache.pig.impl.io.FileSpec; @@ -53,6 +54,8 @@ import org.apache.pig.tools.pigstats.Out import org.apache.pig.tools.pigstats.PigStats.JobGraph; import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter; +import org.python.google.common.collect.Lists; + /** * This class encapsulates the runtime statistics of a MapReduce job. @@ -281,7 +284,7 @@ public final class MRJobStats extends Jo void addCounters(Job job) { try { - counters = HadoopShims.getCounters(job); + counters = getCounters(job); } catch (IOException e) { LOG.warn("Unable to get job counters", e); } @@ -349,13 +352,13 @@ public final class MRJobStats extends Jo void addMapReduceStatistics(Job job) { Iterator<TaskReport> maps = null; try { - maps = HadoopShims.getTaskReports(job, TaskType.MAP); + maps = getTaskReports(job, TaskType.MAP); } catch (IOException e) { LOG.warn("Failed to get map task report", e); } Iterator<TaskReport> reduces = null; try { - reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE); + reduces = getTaskReports(job, TaskType.REDUCE); } catch (IOException e) { LOG.warn("Failed to get reduce task report", e); } @@ -515,4 +518,35 @@ public final class MRJobStats extends Jo inputs.add(is); } + public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException { + if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) { + LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID()); + return null; + } + Cluster cluster = new Cluster(job.getJobConf()); + try { + org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID()); + if (mrJob == null) { // In local mode, mrJob will be null + mrJob = job.getJob(); + } + org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type); + return Lists.newArrayList(reports).iterator(); + } catch (InterruptedException ir) { + throw new IOException(ir); + } + } + + public static Counters getCounters(Job job) throws IOException { + try { + Cluster cluster = new Cluster(job.getJobConf()); + org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID()); + if (mrJob == null) { // In local mode, mrJob will be null + mrJob = job.getJob(); + } + return new Counters(mrJob.getCounters()); + } catch (Exception ir) { + throw new IOException(ir); + } + } + } Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Sat Jan 7 00:11:37 2017 @@ -33,7 +33,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.classification.InterfaceAudience.Private; import org.apache.pig.impl.PigContext; import org.apache.pig.tools.pigstats.JobStats; @@ -51,7 +50,7 @@ public class MRPigStatsUtil extends PigS public static final String TASK_COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter"; public static final String FS_COUNTER_GROUP - = HadoopShims.getFsCounterGroupName(); + = "org.apache.hadoop.mapreduce.FileSystemCounter"; private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class); Modified: pig/trunk/test/e2e/pig/build.xml URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/build.xml?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/build.xml (original) +++ pig/trunk/test/e2e/pig/build.xml Sat Jan 7 00:11:37 2017 @@ -27,9 +27,8 @@ <property name="hive.lib.dir" value="${pig.base.dir}/build/ivy/lib/Pig"/> - <condition property="hive.hadoop.shims.version" value="0.23" else="0.20S"> - <equals arg1="${hadoopversion}" arg2="23" /> - </condition> + <property name="hadoopversion" value="2" /> + <property name="hive.hadoop.shims.version" value="0.23" /> <property name="mvnrepo" value="http://repo2.maven.org/maven2"/> Modified: pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java (original) +++ pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java Sat Jan 7 00:11:37 2017 @@ -360,7 +360,7 @@ public class TestLoadStoreFuncLifeCycle // result, the number of StoreFunc instances is greater by 1 in // Hadoop-2.0.x. assertTrue("storer instanciation count increasing: " + Storer.count, - Storer.count <= (org.apache.pig.impl.util.Utils.isHadoop2() ? 5 : 4)); + Storer.count <= 5); } } Modified: pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java (original) +++ pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java Sat Jan 7 00:11:37 2017 @@ -82,41 +82,38 @@ public class TestQueryParserUtils { QueryParserUtils.setHdfsServers("hello://nn1/tmp", pc); assertEquals(null, props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - if(org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) { - // webhdfs - props.remove(MRConfiguration.JOB_HDFS_SERVERS); - QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc); - assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc); - assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - - // har with webhfs - QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc); - assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc); - assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - props.remove(MRConfiguration.JOB_HDFS_SERVERS); - QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc); - assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - - //viewfs - props.remove(MRConfiguration.JOB_HDFS_SERVERS); - QueryParserUtils.setHdfsServers("viewfs:/tmp", pc); - assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("viewfs:///tmp", pc); - assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc); - assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - - //har with viewfs - props.remove(MRConfiguration.JOB_HDFS_SERVERS); - QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc); - assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc); - assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + // webhdfs + props.remove(MRConfiguration.JOB_HDFS_SERVERS); + QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc); + assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc); + assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + + // har with webhfs + QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc); + assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc); + assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + props.remove(MRConfiguration.JOB_HDFS_SERVERS); + QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc); + assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + + //viewfs + props.remove(MRConfiguration.JOB_HDFS_SERVERS); + QueryParserUtils.setHdfsServers("viewfs:/tmp", pc); + assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("viewfs:///tmp", pc); + assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc); + assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + + //har with viewfs + props.remove(MRConfiguration.JOB_HDFS_SERVERS); + QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc); + assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); + QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc); + assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS)); - - } } Modified: pig/trunk/test/org/apache/pig/test/TestBZip.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestBZip.java (original) +++ pig/trunk/test/org/apache/pig/test/TestBZip.java Sat Jan 7 00:11:37 2017 @@ -43,7 +43,6 @@ import org.apache.hadoop.mapreduce.Input import org.apache.pig.PigServer; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; @@ -67,16 +66,10 @@ public class TestBZip { @Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.") public static Iterable<Object[]> data() { - if ( HadoopShims.isHadoopYARN() ) { - return Arrays.asList(new Object[][] { - { false }, - { true } - }); - } else { - return Arrays.asList(new Object[][] { - { false } - }); - } + return Arrays.asList(new Object[][] { + { false }, + { true } + }); } public TestBZip (Boolean useBzipFromHadoop) { Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original) +++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Sat Jan 7 00:11:37 2017 @@ -63,7 +63,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.builtin.PigStorage; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileSpec; @@ -131,7 +130,7 @@ public class TestJobControlCompiler { // verifying the jar gets on distributed cache Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf); // guava jar is not shipped with Hadoop 2.x - Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length); + Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 5, fileClassPaths.length); Path distributedCachePath = fileClassPaths[0]; Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName()); // hadoop bug requires path to not contain hdfs://hotname in front @@ -235,22 +234,12 @@ public class TestJobControlCompiler { // 4. another.jar and 5. udf1.jar, and not duplicate udf.jar System.out.println("cache.files= " + Arrays.toString(cacheURIs)); System.out.println("classpath.files= " + Arrays.toString(fileClassPaths)); - if (HadoopShims.isHadoopYARN()) { - // Default jars - 5 (pig, antlr, joda-time, automaton) - // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar - Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9, - Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size()); - Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9, - Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size()); - } else { - // Default jars - 5. Has guava in addition - // There will be same entries duplicated for udf.jar and udf2.jar - Assert.assertEquals("size 12 for " + Arrays.toString(cacheURIs), 12, - Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size()); - Assert.assertEquals("size 12 for " + Arrays.toString(fileClassPaths), 12, - Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size()); - } - + // Default jars - 5 (pig, antlr, joda-time, automaton) + // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar + Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9, + Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size()); + Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9, + Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size()); // Count occurrences of the resources Map<String, Integer> occurrences = new HashMap<String, Integer>(); @@ -259,22 +248,12 @@ public class TestJobControlCompiler { val = (val == null) ? 1 : ++val; occurrences.put(cacheURI.toString(), val); } - if (HadoopShims.isHadoopYARN()) { - Assert.assertEquals(9, occurrences.size()); - } else { - Assert.assertEquals(10, occurrences.size()); //guava jar in addition - } + Assert.assertEquals(9, occurrences.size()); for (String file : occurrences.keySet()) { - if (!HadoopShims.isHadoopYARN() && (file.endsWith("udf.jar") || file.endsWith("udf2.jar"))) { - // Same path added twice which is ok. It should not be a shipped to hdfs temp path. - // We assert path is same by checking count - Assert.assertEquals("Two occurrences for " + file, 2, (int) occurrences.get(file)); - } else { - // check that only single occurrence even though we added once to dist cache (simulating via Oozie) - // and second time through pig register jar when there is symlink - Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file)); - } + // check that only single occurrence even though we added once to dist cache (simulating via Oozie) + // and second time through pig register jar when there is symlink + Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file)); } } Modified: pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java (original) +++ pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java Sat Jan 7 00:11:37 2017 @@ -45,12 +45,8 @@ public abstract class TestLoaderStorerSh "store a into 'ooo';"; PhysicalPlan pp = Util.buildPp(pigServer, query); - String hadoopVersion = "20S"; - if (Utils.isHadoop23() || Utils.isHadoop2()) { - hadoopVersion = "23"; - } - String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", - "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"}; + String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", + "hive-shims-0.23", "hive-shims-common", "kryo"}; checkPlan(pp, expectedJars, 6, pigServer.getPigContext()); } @@ -61,12 +57,8 @@ public abstract class TestLoaderStorerSh "store a into 'ooo' using OrcStorage;"; PhysicalPlan pp = Util.buildPp(pigServer, query); - String hadoopVersion = "20S"; - if (Utils.isHadoop23() || Utils.isHadoop2()) { - hadoopVersion = "23"; - } - String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", - "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"}; + String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", + "hive-shims-0.23", "hive-shims-common", "kryo"}; checkPlan(pp, expectedJars, 6, pigServer.getPigContext()); } Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java (original) +++ pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java Sat Jan 7 00:11:37 2017 @@ -1558,14 +1558,7 @@ public class TestMultiQueryCompiler { MROperPlan mrp = null; try { - java.lang.reflect.Method compile = launcher.getClass() - .getDeclaredMethod("compile", - new Class[] { PhysicalPlan.class, PigContext.class }); - - compile.setAccessible(true); - - mrp = (MROperPlan) compile.invoke(launcher, new Object[] { pp, myPig.getPigContext() }); - + mrp = launcher.compile(pp, myPig.getPigContext()); Assert.assertNotNull(mrp); } catch (Exception e) { Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original) +++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Sat Jan 7 00:11:37 2017 @@ -62,6 +62,7 @@ import org.junit.AfterClass; import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -763,10 +764,9 @@ public class TestPigRunner { } @Test + @Ignore + // Skip in hadoop 23 test, see PIG-2449 public void classLoaderTest() throws Exception { - // Skip in hadoop 23 test, see PIG-2449 - if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) - return; PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE)); w.println("register test/org/apache/pig/test/data/pigtestloader.jar"); w.println("A = load '" + INPUT_FILE + "' using org.apache.pig.test.PigTestLoader();"); Modified: pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java (original) +++ pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java Sat Jan 7 00:11:37 2017 @@ -103,11 +103,7 @@ public class TestPigStatsMR extends Test private static MROperPlan getMRPlan(PhysicalPlan pp, PigContext ctx) throws Exception { MapReduceLauncher launcher = new MapReduceLauncher(); - java.lang.reflect.Method compile = launcher.getClass() - .getDeclaredMethod("compile", - new Class[] { PhysicalPlan.class, PigContext.class }); - compile.setAccessible(true); - return (MROperPlan) compile.invoke(launcher, new Object[] { pp, ctx }); + return launcher.compile(pp,ctx); } private static String getAlias(MapReduceOper mro) throws Exception { Modified: pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original) +++ pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Sat Jan 7 00:11:37 2017 @@ -65,6 +65,7 @@ public class TestSkewedJoin { private static final String INPUT_FILE5 = "SkewedJoinInput5.txt"; private static final String INPUT_FILE6 = "SkewedJoinInput6.txt"; private static final String INPUT_FILE7 = "SkewedJoinInput7.txt"; + private static final String INPUT_FILE8 = "SkewedJoinInput8.txt"; private static final String TEST_DIR = Util.getTestDirectory(TestSkewedJoin.class); private static final String INPUT_DIR = TEST_DIR + Path.SEPARATOR + "input"; private static final String OUTPUT_DIR = TEST_DIR + Path.SEPARATOR + "output"; @@ -173,6 +174,11 @@ public class TestSkewedJoin { } w7.close(); + //Empty file + PrintWriter w8 = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE8)); + w8.close(); + + Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1); Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2, INPUT_FILE2); Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3, INPUT_FILE3); @@ -180,6 +186,7 @@ public class TestSkewedJoin { Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE5, INPUT_FILE5); Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE6, INPUT_FILE6); Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE7, INPUT_FILE7); + Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE8, INPUT_FILE8); } private static void deleteFiles() throws IOException { @@ -187,6 +194,21 @@ public class TestSkewedJoin { } @Test + public void testSkewedJoinMapLeftEmpty() throws IOException{ + pigServer.registerQuery("A = LOAD '" + INPUT_FILE8 + "' as (idM:[]);"); + pigServer.registerQuery("B = LOAD '" + INPUT_FILE1 + "' as (id, name, n);"); + pigServer.registerQuery("C = join A by idM#'id', B by id using 'skewed' PARALLEL 2;"); + Iterator<Tuple> iter = pigServer.openIterator("C"); + int count = 0; + while(iter.hasNext()) { + count++; + iter.next(); + } + assertEquals(0, count); + } + + + @Test public void testSkewedJoinWithGroup() throws IOException{ pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);"); pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);"); Modified: pig/trunk/test/org/apache/pig/test/Util.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/test/Util.java (original) +++ pig/trunk/test/org/apache/pig/test/Util.java Sat Jan 7 00:11:37 2017 @@ -75,7 +75,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; +import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager; import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.builtin.Utf8StorageConverter; import org.apache.pig.data.BagFactory; @@ -648,13 +648,10 @@ public class Util { } } - static private String getMkDirCommandForHadoop2_0(String fileName) { - if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) { - Path parentDir = new Path(fileName).getParent(); - String mkdirCommand = parentDir.getName().isEmpty() ? "" : "fs -mkdir -p " + parentDir + "\n"; - return mkdirCommand; - } - return ""; + static private String getFSMkDirCommand(String fileName) { + Path parentDir = new Path(fileName).getParent(); + String mkdirCommand = parentDir.getName().isEmpty() ? "" : "fs -mkdir -p " + parentDir + "\n"; + return mkdirCommand; } /** @@ -676,7 +673,7 @@ public class Util { fileNameOnCluster = fileNameOnCluster.replace('\\','/'); } PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties()); - String script = getMkDirCommandForHadoop2_0(fileNameOnCluster) + "fs -put " + localFileName + " " + fileNameOnCluster; + String script = getFSMkDirCommand(fileNameOnCluster) + "fs -put " + localFileName + " " + fileNameOnCluster; GruntParser parser = new GruntParser(new StringReader(script), ps); parser.setInteractive(false); try { @@ -907,14 +904,7 @@ public class Util { MapRedUtil.checkLeafIsStore(pp, pc); MapReduceLauncher launcher = new MapReduceLauncher(); - - java.lang.reflect.Method compile = launcher.getClass() - .getDeclaredMethod("compile", - new Class[] { PhysicalPlan.class, PigContext.class }); - - compile.setAccessible(true); - - return (MROperPlan) compile.invoke(launcher, new Object[] { pp, pc }); + return launcher.compile(pp,pc); } public static MROperPlan buildMRPlan(String query, PigContext pc) throws Exception { @@ -1357,16 +1347,7 @@ public class Util { // For tez testing, we want to avoid TezResourceManager/LocalResource reuse // (when switching between local and mapreduce/tez) - if( HadoopShims.isHadoopYARN() ) { - try { - java.lang.reflect.Method tez_dropInstance = Class.forName( - "org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager").getDeclaredMethod( - "dropInstance", (Class<?>[]) null ); - tez_dropInstance.invoke(null); - } catch (Exception e){ - throw new RuntimeException(e); - } - } + TezResourceManager.dropInstance(); // TODO: once we have Tez local mode, we can get rid of this. For now, // if we run this test suite in Tez mode and there are some tests Modified: pig/trunk/test/perf/pigmix/bin/generate_data.sh URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/bin/generate_data.sh?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/test/perf/pigmix/bin/generate_data.sh (original) +++ pig/trunk/test/perf/pigmix/bin/generate_data.sh Sat Jan 7 00:11:37 2017 @@ -25,20 +25,11 @@ fi source $PIGMIX_HOME/conf/config.sh -if [ $HADOOP_VERSION == "23" ]; then - echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot" - $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot -else - echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot" - $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot -fi +echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot" +$HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot shopt -s extglob -if [ $HADOOP_VERSION == "23" ]; then - pigjar=`echo $PIG_HOME/pig*-h2.jar` -else - pigjar=`echo $PIG_HOME/pig*-h1.jar` -fi +pigjar=`echo $PIG_HOME/pig*-h2.jar` pigmixjar=$PIGMIX_HOME/pigmix.jar Modified: pig/trunk/test/perf/pigmix/build.xml URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/build.xml?rev=1777738&r1=1777737&r2=1777738&view=diff ============================================================================== --- pig/trunk/test/perf/pigmix/build.xml (original) +++ pig/trunk/test/perf/pigmix/build.xml Sat Jan 7 00:11:37 2017 @@ -34,6 +34,8 @@ </fileset> </path> + <property name="hadoopversion" value="2" /> + <property name="java.dir" value="${basedir}/src/java"/> <property name="pigmix.build.dir" value="${basedir}/build"/> <property name="pigmix.jar" value="${basedir}/pigmix.jar"/>
