Author: rohini Date: Fri Sep 8 23:05:29 2017 New Revision: 1807845 URL: http://svn.apache.org/viewvc?rev=1807845&view=rev Log: PIG-5290: User Cache upload contention can cause job failures (xkrogen via rohini)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java pig/trunk/test/e2e/pig/tests/nightly.conf Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1807845&r1=1807844&r2=1807845&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Fri Sep 8 23:05:29 2017 @@ -48,6 +48,8 @@ OPTIMIZATIONS BUG FIXES +PIG-5290: User Cache upload contention can cause job failures (xkrogen via rohini) + PIG-5293: Suspicious code as missing `this' for a member (lifove via daijy) PIG-5294: Spark unit tests are always run in spark1 mode (szita) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1807845&r1=1807844&r2=1807845&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Sep 8 23:05:29 2017 @@ -37,6 +37,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.Random; import java.util.TreeMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -172,6 +173,9 @@ public class JobControlCompiler{ public static final String PIG_MAP_SEPARATOR = "_"; public HashMap<String, ArrayList<Pair<String,Long>>> globalCounters = new HashMap<String, ArrayList<Pair<String,Long>>>(); + private static final Random RAND = new Random(); + private static final String CACHE_TMP_FILE_TEMPLATE = "tmp%d.tmp"; + public static final String SMALL_JOB_LOG_MSG = "This job was detected as a small job, will run in-process instead"; public static final String BIG_JOB_LOG_MSG = "This job cannot be converted run in-process"; @@ -1734,7 +1738,6 @@ public class JobControlCompiler{ URL url) throws IOException { InputStream is1 = null; InputStream is2 = null; - OutputStream os = null; try { Path stagingDir = getCacheStagingDir(conf); @@ -1757,10 +1760,22 @@ public class JobControlCompiler{ is2 = url.openStream(); short replication = (short)conf.getInt(PigConfiguration.PIG_USER_CACHE_REPLICATION, conf.getInt("mapred.submit.replication", 10)); - os = fs.create(cacheFile, replication); - fs.setPermission(cacheFile, FileLocalizer.OWNER_ONLY_PERMS); - IOUtils.copyBytes(is2, os, 4096, true); + Path tempCacheFile = new Path(cacheDir, + String.format(CACHE_TMP_FILE_TEMPLATE, RAND.nextInt())); + try { + try (OutputStream os = fs.create(tempCacheFile, replication)) { + fs.setPermission(tempCacheFile, FileLocalizer.OWNER_ONLY_PERMS); + IOUtils.copyBytes(is2, os, 4096, true); + } + fs.rename(tempCacheFile, cacheFile); + } catch (IOException ioe) { + // Attempt some cleanup to avoid leaving tmp files around + if (fs.exists(tempCacheFile)) { + fs.delete(tempCacheFile, false); + } + throw ioe; + } return cacheFile; } catch (IOException ioe) { @@ -1769,10 +1784,6 @@ public class JobControlCompiler{ } finally { org.apache.commons.io.IOUtils.closeQuietly(is1); org.apache.commons.io.IOUtils.closeQuietly(is2); - // IOUtils should not close stream to HDFS quietly - if (os != null) { - os.close(); - } } } Modified: pig/trunk/test/e2e/pig/tests/nightly.conf URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1807845&r1=1807844&r2=1807845&view=diff ============================================================================== --- pig/trunk/test/e2e/pig/tests/nightly.conf (original) +++ pig/trunk/test/e2e/pig/tests/nightly.conf Fri Sep 8 23:05:29 2017 @@ -4962,7 +4962,7 @@ store C into ':OUTPATH:';\, 'tests' => [ { 'num' => 1, - 'java_params' => ['-Dopt.fetch=false'], + 'java_params' => ['-Dopt.fetch=false', '-Dpig.user.cache.enabled=true'], 'execonly' => 'mapred,tez,spark', # since distributed cache is not supported in local mode 'pig' => q? register :FUNCPATH:/testudf.jar; @@ -4979,6 +4979,7 @@ store C into ':OUTPATH:';\, 'tests' => [ { 'num' => 1, + 'java_params' => ['-Dpig.user.cache.enabled=true'], 'pig' => q?register :FUNCPATH:/testudf.jar; define gm org.apache.pig.test.udf.evalfunc.GoodMonitored(); a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); @@ -4989,6 +4990,7 @@ store C into ':OUTPATH:';\, store b into ':OUTPATH:';?, },{ 'num' => 2, + 'java_params' => ['-Dpig.user.cache.enabled=true'], 'pig' => q?register :FUNCPATH:/testudf.jar; define bad org.apache.pig.test.udf.evalfunc.BadMonitored(); a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa); @@ -5001,6 +5003,7 @@ store C into ':OUTPATH:';\, store b into ':OUTPATH:';?, },{ 'num' => 3, + 'java_params' => ['-Dpig.user.cache.enabled=true'], 'pig' => q?register :FUNCPATH:/testudf.jar; define bad org.apache.pig.test.udf.evalfunc.BadMonitored(); a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);