Author: rohini
Date: Wed Mar 11 05:14:02 2015
New Revision: 1665762

URL: http://svn.apache.org/r1665762
Log:
PIG-4443: Additional patch to fix wrong split being read in case of TEZ-2192

Modified:
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1665762&r1=1665761&r2=1665762&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Wed Mar 11 05:14:02 2015
@@ -22,16 +22,20 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
@@ -43,6 +47,9 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
@@ -173,7 +180,9 @@ public class TezDagBuilder extends TezOp
     private Map<String, LocalResource> localResources;
     private PigContext pc;
     private Configuration globalConf;
+    private FileSystem fs;
     private long intermediateTaskInputSize;
+    private Set<String> inputSplitInDiskVertices;
 
     public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
             Map<String, LocalResource> localResources) {
@@ -182,6 +191,7 @@ public class TezDagBuilder extends TezOp
         this.globalConf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), true);
         this.localResources = localResources;
         this.dag = dag;
+        this.inputSplitInDiskVertices = new HashSet<String>();
 
         try {
             // Add credentials from binary token file and get tokens for 
namenodes
@@ -192,7 +202,8 @@ public class TezDagBuilder extends TezOp
         }
 
         try {
-            intermediateTaskInputSize = 
HadoopShims.getDefaultBlockSize(FileSystem.get(globalConf), 
FileLocalizer.getTemporaryResourcePath(pc));
+            fs = FileSystem.get(globalConf);
+            intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, 
FileLocalizer.getTemporaryResourcePath(pc));
         } catch (Exception e) {
             log.warn("Unable to get the block size for temporary directory, 
defaulting to 128MB", e);
             intermediateTaskInputSize = 134217728L;
@@ -205,6 +216,40 @@ public class TezDagBuilder extends TezOp
                         InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
     }
 
+    // Hack to turn off relocalization till TEZ-2192 is fixed.
+    public void avoidContainerReuseIfInputSplitInDisk() throws IOException {
+        if (!inputSplitInDiskVertices.isEmpty()) {
+            // Create empty job.split file and add as resource to all other
+            // vertices that are not reading splits from disk so that their
+            // containers are not reused by vertices that read splits from disk
+            Path jobSplitFile = new Path(FileLocalizer.getTemporaryPath(pc),
+                    MRJobConfig.JOB_SPLIT);
+            FSDataOutputStream out = fs.create(jobSplitFile);
+            out.close();
+            log.info("Creating empty job.split in " + jobSplitFile);
+            FileStatus splitFileStatus = fs.getFileStatus(jobSplitFile);
+            LocalResource localResource = LocalResource.newInstance(
+                    ConverterUtils.getYarnUrlFromPath(jobSplitFile),
+                    LocalResourceType.FILE,
+                    LocalResourceVisibility.APPLICATION,
+                    splitFileStatus.getLen(),
+                    splitFileStatus.getModificationTime());
+            for (Vertex vertex : dag.getVertices()) {
+                if (!inputSplitInDiskVertices.contains(vertex.getName())) {
+                    if (vertex.getTaskLocalFiles().containsKey(
+                            MRJobConfig.JOB_SPLIT)) {
+                        throw new RuntimeException(
+                                "LocalResources already contains a"
+                                        + " resource named "
+                                        + MRJobConfig.JOB_SPLIT);
+                    }
+                    vertex.getTaskLocalFiles().put(MRJobConfig.JOB_SPLIT,
+                            localResource);
+                }
+            }
+        }
+    }
+
     @Override
     public void visitTezOp(TezOperator tezOp) throws VisitorException {
         TezOperPlan tezPlan = getPlan();
@@ -658,19 +703,20 @@ public class TezDagBuilder extends TezOp
                             
org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
                             false);
                     // Write splits to disk
-                    FileSystem fs = FileSystem.get(payloadConf);
                     Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
                     log.info("Writing input splits to " + inputSplitsDir
+                            + " for vertex " + vertex.getName()
                             + " as the serialized size in memory is "
                             + splitsSerializedSize + ". Configured "
                             + 
PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD
                             + " is " + spillThreshold);
-                    inputSplitInfo = MRToTezHelper.convertToInputSplitInfoDisk(
+                    inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk(
                             (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, 
payloadConf, fs);
                     additionalLocalResources = new HashMap<String, 
LocalResource>();
                     MRToTezHelper.updateLocalResourcesForInputSplits(
                             fs, inputSplitInfo,
                             additionalLocalResources);
+                    inputSplitInDiskVertices.add(vertex.getName());
                 } else {
                     // Send splits via RPC to AM
                     userPayLoadBuilder.setSplits(splitsProto);

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1665762&r1=1665761&r2=1665762&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
 Wed Mar 11 05:14:02 2015
@@ -60,6 +60,7 @@ public class TezJobCompiler {
         tezDag.setCredentials(tezPlanNode.getTezOperPlan().getCredentials());
         TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, 
tezPlanNode.getTezOperPlan(), tezDag, localResources);
         dagBuilder.visit();
+        dagBuilder.avoidContainerReuseIfInputSplitInDisk();
         return tezDag;
     }
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1665762&r1=1665761&r2=1665762&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
 Wed Mar 11 05:14:02 2015
@@ -226,11 +226,9 @@ public class MRToTezHelper {
     /**
      * Write input splits (job.split and job.splitmetainfo) to disk
      */
-    public static InputSplitInfoDisk convertToInputSplitInfoDisk(
+    public static InputSplitInfoDisk writeInputSplitInfoToDisk(
             InputSplitInfoMem infoMem, Path inputSplitsDir, JobConf jobConf,
             FileSystem fs) throws IOException, InterruptedException {
-        LOG.info("Generating new input splits" + ", splitsDir="
-                + inputSplitsDir.toString());
 
         InputSplit[] splits = infoMem.getNewFormatSplits();
         JobSplitWriter.createSplitFiles(inputSplitsDir, jobConf, fs, splits);


Reply via email to