Author: rohini
Date: Fri Sep  8 23:05:29 2017
New Revision: 1807845

URL: http://svn.apache.org/viewvc?rev=1807845&view=rev
Log:
PIG-5290: User Cache upload contention can cause job failures (xkrogen via 
rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/test/e2e/pig/tests/nightly.conf

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1807845&r1=1807844&r2=1807845&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Sep  8 23:05:29 2017
@@ -48,6 +48,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5290: User Cache upload contention can cause job failures (xkrogen via 
rohini)
+
 PIG-5293: Suspicious code as missing `this' for a member (lifove via daijy)
 
 PIG-5294: Spark unit tests are always run in spark1 mode (szita)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1807845&r1=1807844&r2=1807845&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Fri Sep  8 23:05:29 2017
@@ -37,6 +37,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.Random;
 import java.util.TreeMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -172,6 +173,9 @@ public class JobControlCompiler{
     public static final String PIG_MAP_SEPARATOR = "_";
     public HashMap<String, ArrayList<Pair<String,Long>>> globalCounters = new 
HashMap<String, ArrayList<Pair<String,Long>>>();
 
+    private static final Random RAND = new Random();
+    private static final String CACHE_TMP_FILE_TEMPLATE = "tmp%d.tmp";
+
     public static final String SMALL_JOB_LOG_MSG = "This job was detected as a 
small job, will run in-process instead";
     public static final String BIG_JOB_LOG_MSG = "This job cannot be converted 
run in-process";
 
@@ -1734,7 +1738,6 @@ public class JobControlCompiler{
             URL url) throws IOException {
         InputStream is1 = null;
         InputStream is2 = null;
-        OutputStream os = null;
 
         try {
             Path stagingDir = getCacheStagingDir(conf);
@@ -1757,10 +1760,22 @@ public class JobControlCompiler{
             is2 = url.openStream();
             short replication = 
(short)conf.getInt(PigConfiguration.PIG_USER_CACHE_REPLICATION,
                     conf.getInt("mapred.submit.replication", 10));
-            os = fs.create(cacheFile, replication);
-            fs.setPermission(cacheFile, FileLocalizer.OWNER_ONLY_PERMS);
-            IOUtils.copyBytes(is2, os, 4096, true);
+            Path tempCacheFile = new Path(cacheDir,
+                String.format(CACHE_TMP_FILE_TEMPLATE, RAND.nextInt()));
 
+            try {
+                try (OutputStream os = fs.create(tempCacheFile, replication)) {
+                    fs.setPermission(tempCacheFile, 
FileLocalizer.OWNER_ONLY_PERMS);
+                    IOUtils.copyBytes(is2, os, 4096, true);
+                }
+                fs.rename(tempCacheFile, cacheFile);
+            } catch (IOException ioe) {
+                // Attempt some cleanup to avoid leaving tmp files around
+                if (fs.exists(tempCacheFile)) {
+                    fs.delete(tempCacheFile, false);
+                }
+                throw ioe;
+            }
             return cacheFile;
 
         } catch (IOException ioe) {
@@ -1769,10 +1784,6 @@ public class JobControlCompiler{
         } finally {
             org.apache.commons.io.IOUtils.closeQuietly(is1);
             org.apache.commons.io.IOUtils.closeQuietly(is2);
-            // IOUtils should not close stream to HDFS quietly
-            if (os != null) {
-                os.close();
-            }
         }
     }
 

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1807845&r1=1807844&r2=1807845&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Fri Sep  8 23:05:29 2017
@@ -4962,7 +4962,7 @@ store C into ':OUTPATH:';\,
             'tests' => [
                 {
                     'num' => 1,
-                    'java_params' => ['-Dopt.fetch=false'],
+                    'java_params' => ['-Dopt.fetch=false', 
'-Dpig.user.cache.enabled=true'],
                     'execonly' => 'mapred,tez,spark', # since distributed 
cache is not supported in local mode
                     'pig' => q?
                         register :FUNCPATH:/testudf.jar;
@@ -4979,6 +4979,7 @@ store C into ':OUTPATH:';\,
                 'tests' => [
                     {
                         'num' => 1,
+                        'java_params' => ['-Dpig.user.cache.enabled=true'],
                         'pig' => q?register :FUNCPATH:/testudf.jar;
                                 define gm 
org.apache.pig.test.udf.evalfunc.GoodMonitored();
                                 a = load ':INPATH:/singlefile/studenttab10k' 
as (name, age, gpa);
@@ -4989,6 +4990,7 @@ store C into ':OUTPATH:';\,
                                                 store b into ':OUTPATH:';?,
                     },{
                         'num' => 2,
+                        'java_params' => ['-Dpig.user.cache.enabled=true'],
                         'pig' => q?register :FUNCPATH:/testudf.jar;
                                 define bad 
org.apache.pig.test.udf.evalfunc.BadMonitored();
                                 a = load ':INPATH:/singlefile/studenttab10k' 
as (name, age, gpa);
@@ -5001,6 +5003,7 @@ store C into ':OUTPATH:';\,
                                                 store b into ':OUTPATH:';?,
                     },{
                         'num' => 3,
+                        'java_params' => ['-Dpig.user.cache.enabled=true'],
                         'pig' => q?register :FUNCPATH:/testudf.jar;
                                 define bad 
org.apache.pig.test.udf.evalfunc.BadMonitored();
                                 a = load ':INPATH:/singlefile/studenttab10k' 
as (name, age, gpa);


Reply via email to