Author: rohini
Date: Fri Mar  6 21:22:53 2015
New Revision: 1664723

URL: http://svn.apache.org/r1664723
Log:
PIG-4443: Write inputsplits in Tez to disk if the size is huge and option to 
compress pig input splits (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
    pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Mar  6 21:22:53 2015
@@ -50,6 +50,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4443: Write inputsplits in Tez to disk if the size is huge and option to 
compress pig input splits (rohini)
+
 PIG-4447: Pig Cannot handle nullable values (arrays and records) in avro 
records (rdsr via daijy)
 
 PIG-4444: Fix unit test failure TestTezAutoParallelism (daijy)

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Fri Mar  6 21:22:53 2015
@@ -118,6 +118,21 @@ public class PigConfiguration {
      */
     public static final String PIG_TEZ_AUTO_PARALLELISM = 
"pig.tez.auto.parallelism";
 
+    /**
+     * This key is used to configure compression for the pig input splits which
+     * are not FileSplit. Default is false
+     */
+    public static final String PIG_COMPRESS_INPUT_SPLITS = 
"pig.compress.input.splits";
+    public static final boolean PIG_COMPRESS_INPUT_SPLITS_DEFAULT = false;
+
+    /**
+     * Serialize input splits to disk if the input splits size exceeds a
+     * threshold to avoid hitting default RPC transfer size limit of 64MB.
+     * Default is 33554432 (32MB)
+     */
+    public static final String PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD = 
"pig.tez.input.splits.mem.threshold";
+    public static final int PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD_DEFAULT = 
33554432;
+
     // Pig UDF profiling settings
     /**
      * Controls whether execution time of Pig UDFs should be tracked.

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
 Fri Mar  6 21:22:53 2015
@@ -20,7 +20,9 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -30,13 +32,15 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.List;
-import java.util.HashSet;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
-import java.lang.StringBuilder;
+import java.util.Set;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -46,7 +50,10 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.data.WritableByteArray;
 import org.apache.pig.impl.plan.OperatorKey;
 
 /**
@@ -59,19 +66,22 @@ import org.apache.pig.impl.plan.Operator
  * wrapped InputSplit.
  */
 public class PigSplit extends InputSplit implements Writable, Configurable {
+
+
+    private static String FILESPLIT_CLASSNAME = FileSplit.class.getName();
     //The operators to which the tuples from this
     //input file are attached. These are the successors
     //of the load operator representing this input
     private ArrayList<OperatorKey> targetOps;
 
     // index starting from 0 representing the input number
-    // So if we have 3 inputs (say for a 3 way join), then the 
+    // So if we have 3 inputs (say for a 3 way join), then the
     // splits corresponding to the first input will have an index of 0, those
     // corresponding to the second will have an index of 1 and so on
     // This will be used to get the LoadFunc corresponding to the input
     // in PigInputFormat and related code.
     private int inputIndex;
-    
+
     // The real InputSplit this split is wrapping
     private InputSplit[] wrappedSplits;
 
@@ -80,36 +90,36 @@ public class PigSplit extends InputSplit
     // This will be used by MergeJoinIndexer to record the split # in the
     // index
     private int splitIndex;
-    
+
     // index of current splits being process
     private int currentIdx;
-    
+
     // the flag indicates this is a multi-input join (i.e. join)
-    // so that custom Hadoop counters will be created in the 
+    // so that custom Hadoop counters will be created in the
     // back-end to track the number of records for each input.
     private boolean isMultiInputs = false;
-    
+
     // the flag indicates the custom Hadoop counter should be disabled.
     // This is to prevent the number of counters exceeding the limit.
     // This flag is controlled by Pig property "pig.disable.counter" (
     // the default value is 'false').
     private boolean disableCounter = false;
-    
+
     /**
      * the job Configuration
      */
     private Configuration conf;
-    
+
     /**
      * total number of splits - required by skew join
      */
     private int totalSplits;
-    
+
     /**
      * total length
      */
     private long length = -1;
-    
+
     /**
      * overall locations
      */
@@ -118,8 +128,8 @@ public class PigSplit extends InputSplit
     // this seems necessary for Hadoop to instatiate this split on the
     // backend
     public PigSplit() {}
-    
-    public PigSplit(InputSplit[] wrappedSplits, int inputIndex, 
+
+    public PigSplit(InputSplit[] wrappedSplits, int inputIndex,
             List<OperatorKey> targetOps, int splitIndex) {
         this.wrappedSplits = wrappedSplits;
         this.inputIndex = inputIndex;
@@ -127,30 +137,30 @@ public class PigSplit extends InputSplit
         this.splitIndex = splitIndex;
         this.currentIdx = 0;
     }
-    
+
     public List<OperatorKey> getTargetOps() {
         return new ArrayList<OperatorKey>(targetOps);
     }
-    
+
 
     /**
-     * This methods returns the actual InputSplit (as returned by the 
+     * This methods returns the actual InputSplit (as returned by the
      * {@link InputFormat}) which this class is wrapping.
      * @return the wrappedSplit
      */
     public InputSplit getWrappedSplit() {
         return wrappedSplits[currentIdx];
     }
-    
+
     /**
-     * 
+     *
      * @param idx the index into the wrapped splits
      * @return the specified wrapped split
      */
     public InputSplit getWrappedSplit(int idx) {
         return wrappedSplits[idx];
     }
-    
+
     @Override
     @SuppressWarnings("unchecked")
     public String[] getLocations() throws IOException, InterruptedException {
@@ -200,7 +210,7 @@ public class PigSplit extends InputSplit
         }
         return length;
     }
-    
+
     /**
      * Return the length of a wrapped split
      * @param idx the index into the wrapped splits
@@ -210,6 +220,7 @@ public class PigSplit extends InputSplit
         return wrappedSplits[idx].getLength();
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public void readFields(DataInput is) throws IOException {
         disableCounter = is.readBoolean();
@@ -220,32 +231,59 @@ public class PigSplit extends InputSplit
         targetOps = (ArrayList<OperatorKey>) readObject(is);
         int splitLen = is.readInt();
         int distinctSplitClassCount = is.readInt();
+        boolean nonFileSplit = false;
         //construct the input split class name list
+
         String[] distinctSplitClassName = new String[distinctSplitClassCount];
         for (int i = 0; i < distinctSplitClassCount; i++) {
             distinctSplitClassName[i] = is.readUTF();
+            if (!distinctSplitClassName[i].equals(FILESPLIT_CLASSNAME)) {
+                nonFileSplit = true;
+            }
         }
         try {
             SerializationFactory sf = new SerializationFactory(conf);
             // The correct call sequence for Deserializer is, we shall open, 
then deserialize, but we shall not close
             wrappedSplits = new InputSplit[splitLen];
+
+            if (splitLen <= 0) {
+                return;
+            }
+
+            // Do not compress if everything is FileSplit as it does not 
compress much
+            // but adds few seconds for 30K+ tasks
+            boolean compress = nonFileSplit && conf.getBoolean(
+                    PigConfiguration.PIG_COMPRESS_INPUT_SPLITS,
+                    PigConfiguration.PIG_COMPRESS_INPUT_SPLITS_DEFAULT);
+            DataInputStream dis = null;
+            if (compress) {
+                int numBytes = is.readInt();
+                byte[] buf = new byte[numBytes];
+                is.readFully(buf, 0, numBytes);
+                dis = new DataInputStream(new InflaterInputStream(new 
ByteArrayInputStream(buf)));
+            }
+            DataInput dataIn = compress ? dis : is;
             for (int i = 0; i < splitLen; i++)
             {
                 //read the className index
-                int index = is.readInt();
+                int index = dataIn.readInt();
                 //get the split class name
                 String splitClassName = distinctSplitClassName[index];
                 Class splitClass = conf.getClassByName(splitClassName);
                 Deserializer d = sf.getDeserializer(splitClass);
-                d.open((InputStream) is);
+                d.open((InputStream) dataIn);
                 wrappedSplits[i] = 
(InputSplit)ReflectionUtils.newInstance(splitClass, conf);
                 d.deserialize(wrappedSplits[i]);
             }
+            if (compress && splitLen > 0) {
+                dis.close();
+            }
         } catch (ClassNotFoundException e) {
             throw new IOException(e);
         }
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public void write(DataOutput os) throws IOException {
         os.writeBoolean(disableCounter);
@@ -262,6 +300,7 @@ public class PigSplit extends InputSplit
         }
         List<String> distinctSplitClassList = new ArrayList<String>();
         distinctSplitClassList.addAll(splitClassNameSet);
+        boolean nonFileSplit = distinctSplitClassList.size() > 1 || 
(!distinctSplitClassList.contains(FILESPLIT_CLASSNAME));
         //write the distinct number of split class name
         os.writeInt(distinctSplitClassList.size());
         //write each classname once
@@ -270,20 +309,43 @@ public class PigSplit extends InputSplit
         }
         SerializationFactory sf = new SerializationFactory(conf);
 
+        if (wrappedSplits.length <= 0) {
+            return;
+        }
+
+        boolean compress = nonFileSplit && conf.getBoolean(
+                PigConfiguration.PIG_COMPRESS_INPUT_SPLITS,
+                PigConfiguration.PIG_COMPRESS_INPUT_SPLITS_DEFAULT);
+        WritableByteArray byteStream = null;
+        Deflater deflater = null;
+        DataOutputStream dos = null;
+        if (compress) {
+            byteStream = new WritableByteArray(16384);
+            deflater = new Deflater(Deflater.BEST_COMPRESSION);
+            dos = new DataOutputStream(new DeflaterOutputStream(byteStream, 
deflater));
+        }
+        DataOutput dataOut = compress ? dos : os;
         for (int i = 0; i < wrappedSplits.length; i++)
         {
             //find out the index of the split class name
             int index = 
distinctSplitClassList.indexOf(wrappedSplits[i].getClass().getName());
-            os.writeInt(index);
+            dataOut.writeInt(index);
             Serializer s = sf.getSerializer(wrappedSplits[i].getClass());
             //Checks if Serializer is NULL or not before calling open() method 
on it.
             if (s == null) {
                 throw new IllegalArgumentException("Could not find Serializer 
for class "+wrappedSplits[i].getClass()+". InputSplits must implement 
Writable.");
             }
-            s.open((OutputStream) os);
+            s.open((OutputStream) dataOut);
             // The correct call sequence for Serializer is, we shall open, 
then serialize, but we shall not close
             s.serialize(wrappedSplits[i]);
         }
+        if (compress) {
+            //Get the compressed serialized bytes and write them
+            dos.close();
+            os.writeInt(byteStream.getLength());
+            os.write(byteStream.getData(), 0, byteStream.getLength());
+            deflater.end();
+        }
 
     }
 
@@ -323,7 +385,7 @@ public class PigSplit extends InputSplit
     public void setMultiInputs(boolean b) {
         isMultiInputs = b;
     }
-    
+
     /**
      * Returns true if the map has multiple inputs, else false
      * @return true if the map has multiple inputs, else false
@@ -331,7 +393,7 @@ public class PigSplit extends InputSplit
     public boolean isMultiInputs() {
         return isMultiInputs;
     }
-    
+
     @Override
     public Configuration getConf() {
         return conf;
@@ -340,20 +402,20 @@ public class PigSplit extends InputSplit
 
     /** (non-Javadoc)
      * @see 
org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)
-     * 
-     * This will be called by 
+     *
+     * This will be called by
      * {@link PigInputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)}
-     * to be used in {@link #write(DataOutput)} for serializing the 
+     * to be used in {@link #write(DataOutput)} for serializing the
      * wrappedSplit
-     * 
-     * This will be called by Hadoop in the backend to set the right Job 
+     *
+     * This will be called by Hadoop in the backend to set the right Job
      * Configuration (hadoop will invoke this method because PigSplit 
implements
      * {@link Configurable} - we need this Configuration in readFields() to
-     * deserialize the wrappedSplit 
+     * deserialize the wrappedSplit
      */
     @Override
     public void setConf(Configuration conf) {
-        this.conf = conf;        
+        this.conf = conf;
     }
 
     // package level access because we don't want LoadFunc implementations
@@ -362,9 +424,9 @@ public class PigSplit extends InputSplit
     int getInputIndex() {
         return inputIndex;
     }
-    
+
     /**
-     * 
+     *
      * @return the number of wrapped splits
      */
     public int getNumPaths() {
@@ -402,7 +464,7 @@ public class PigSplit extends InputSplit
                     wrappedSplits[i].getClass().getName() + "\n   
Locations:\n");
                 for (String location :  wrappedSplits[i].getLocations())
                     st.append("    "+location+"\n");
-                st.append("\n-----------------------\n"); 
+                st.append("\n-----------------------\n");
           }
         } catch (IOException e) {
           return null;
@@ -419,7 +481,7 @@ public class PigSplit extends InputSplit
     public boolean disableCounter() {
         return disableCounter;
     }
-    
+
     public void setCurrentIdx(int idx) {
         this.currentIdx = idx;
     }

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Fri Mar  6 21:22:53 2015
@@ -145,12 +145,16 @@ import org.apache.tez.dag.library.vertex
 import org.apache.tez.mapreduce.combine.MRCombiner;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
+import 
org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto.Builder;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
 import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
@@ -631,21 +635,61 @@ public class TezDagBuilder extends TezOp
                 + ", memory=" + vertex.getTaskResource().getMemory()
                 + ", java opts=" + vertex.getTaskLaunchCmdOpts()
                 );
-
         // Right now there can only be one of each of these. Will need to be
         // more generic when there can be more.
         for (POLoad ld : tezOp.getLoaderInfo().getLoads()) {
 
             // TODO: These should get the globalConf, or a merged version that
             // keeps settings like pig.maxCombinedSplitSize
-            
vertex.setLocationHint(VertexLocationHint.create(tezOp.getLoaderInfo().getInputSplitInfo().getTaskLocationHints()));
+            Builder userPayLoadBuilder = 
MRRuntimeProtos.MRInputUserPayloadProto.newBuilder();
+
+            InputSplitInfo inputSplitInfo = 
tezOp.getLoaderInfo().getInputSplitInfo();
+            Map<String, LocalResource> additionalLocalResources = null;
+            int spillThreshold = payloadConf
+                    
.getInt(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD,
+                            
PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD_DEFAULT);
+
+            // Currently inputSplitInfo is always InputSplitInfoMem at this 
point
+            if (inputSplitInfo instanceof InputSplitInfoMem) {
+                MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
+                int splitsSerializedSize = splitsProto.getSerializedSize();
+                if(splitsSerializedSize > spillThreshold) {
+                    payloadConf.setBoolean(
+                            
org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
+                            false);
+                    // Write splits to disk
+                    FileSystem fs = FileSystem.get(payloadConf);
+                    Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
+                    log.info("Writing input splits to " + inputSplitsDir
+                            + " as the serialized size in memory is "
+                            + splitsSerializedSize + ". Configured "
+                            + 
PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD
+                            + " is " + spillThreshold);
+                    inputSplitInfo = MRToTezHelper.convertToInputSplitInfoDisk(
+                            (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, 
payloadConf, fs);
+                    additionalLocalResources = new HashMap<String, 
LocalResource>();
+                    MRToTezHelper.updateLocalResourcesForInputSplits(
+                            fs, inputSplitInfo,
+                            additionalLocalResources);
+                } else {
+                    // Send splits via RPC to AM
+                    userPayLoadBuilder.setSplits(splitsProto);
+                }
+                //Free up memory
+                tezOp.getLoaderInfo().setInputSplitInfo(null);
+            }
+            
userPayLoadBuilder.setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf));
+
+            
vertex.setLocationHint(VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()));
             vertex.addDataSource(ld.getOperatorKey().toString(),
                     
DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName())
-                          
.setUserPayload(UserPayload.create(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
-                          
.setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf))
-                          
.setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer()))
-                          .setHistoryText(convertToHistoryText("", 
payloadConf)),
-                    
InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()), 
dag.getCredentials()));
+                            
.setUserPayload(UserPayload.create(userPayLoadBuilder.build().toByteString().asReadOnlyByteBuffer()))
+                            .setHistoryText(convertToHistoryText("", 
payloadConf)),
+                    
InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()),
+                    inputSplitInfo.getNumTasks(),
+                    dag.getCredentials(),
+                    null,
+                    additionalLocalResources));
         }
 
         for (POStore store : stores) {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
 Fri Mar  6 21:22:53 2015
@@ -112,8 +112,8 @@ public class TezLauncher extends Launche
         if (pc.getExecType().isLocal()) {
             pc.getProperties().setProperty(TezConfiguration.TEZ_LOCAL_MODE, 
"true");
             
pc.getProperties().setProperty(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
 "true");
-            pc.getProperties().setProperty("tez.ignore.lib.uris", "true");
-            pc.getProperties().setProperty("tez.am.dag.scheduler.class", 
DAGSchedulerNaturalOrderControlled.class.getName());
+            
pc.getProperties().setProperty(TezConfiguration.TEZ_IGNORE_LIB_URIS, "true");
+            
pc.getProperties().setProperty(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, 
DAGSchedulerNaturalOrderControlled.class.getName());
         }
         Configuration conf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), true);
         if (pc.defaultParallel == -1 && 
!conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true)) {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
 Fri Mar  6 21:22:53 2015
@@ -17,7 +17,9 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.util;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -25,30 +27,49 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
 
 @InterfaceAudience.Private
 public class MRToTezHelper {
 
     private static final Log LOG = LogFactory.getLog(MRToTezHelper.class);
+    private static final String JOB_SPLIT_RESOURCE_NAME = 
MRJobConfig.JOB_SPLIT;
+    private static final String JOB_SPLIT_METAINFO_RESOURCE_NAME = 
MRJobConfig.JOB_SPLIT_METAINFO;
 
     private static List<String> mrSettingsToRetain = new ArrayList<String>();
 
+    private static List<String> mrSettingsToRemove = new ArrayList<String>();
+
     private MRToTezHelper() {
     }
 
     static {
         populateMRSettingsToRetain();
+        populateMRSettingsToRemove();
     }
 
     private static void populateMRSettingsToRetain() {
@@ -70,6 +91,28 @@ public class MRToTezHelper {
         
mrSettingsToRetain.add(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER);
     }
 
+    private static void populateMRSettingsToRemove() {
+
+        // TODO: Add all unwanted MR config once Tez UI starts showing config
+
+        // FileInputFormat.listStatus() on a task can cause job failure when 
run from Oozie
+        mrSettingsToRemove.add(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
+    }
+
+    private static void removeUnwantedMRSettings(Configuration tezConf) {
+
+        Iterator<Entry<String, String>> iter = tezConf.iterator();
+        while (iter.hasNext()) {
+            Entry<String, String> next = iter.next();
+            for (String mrSetting : mrSettingsToRemove) {
+                if (next.getKey().equals(mrSetting)) {
+                    iter.remove();
+                    break;
+                }
+            }
+        }
+    }
+
     public static TezConfiguration getDAGAMConfFromMRConf(
             Configuration tezConf) {
 
@@ -132,7 +175,12 @@ public class MRToTezHelper {
                     tezConf.get(MRConfiguration.JOB_CREDENTIALS_BINARY));
         }
 
-        //TODO: Strip out all MR settings
+        if (tezConf.get(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN) != null) {
+            
dagAMConf.setIfUnset(TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION,
+                    tezConf.get(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN));
+        }
+
+        removeUnwantedMRSettings(dagAMConf);
 
         return dagAMConf;
     }
@@ -154,6 +202,7 @@ public class MRToTezHelper {
         }
         JobControlCompiler.configureCompression(conf);
         convertMRToTezRuntimeConf(conf, mrConf);
+        removeUnwantedMRSettings(conf);
     }
 
     /**
@@ -174,4 +223,67 @@ public class MRToTezHelper {
         }
     }
 
+    /**
+     * Write input splits (job.split and job.splitmetainfo) to disk
+     */
+    public static InputSplitInfoDisk convertToInputSplitInfoDisk(
+            InputSplitInfoMem infoMem, Path inputSplitsDir, JobConf jobConf,
+            FileSystem fs) throws IOException, InterruptedException {
+        LOG.info("Generating new input splits" + ", splitsDir="
+                + inputSplitsDir.toString());
+
+        InputSplit[] splits = infoMem.getNewFormatSplits();
+        JobSplitWriter.createSplitFiles(inputSplitsDir, jobConf, fs, splits);
+
+        return new InputSplitInfoDisk(
+                JobSubmissionFiles.getJobSplitFile(inputSplitsDir),
+                JobSubmissionFiles.getJobSplitMetaFile(inputSplitsDir),
+                splits.length, infoMem.getTaskLocationHints(),
+                jobConf.getCredentials());
+    }
+
+    /**
+     * Exact copy of private method from from 
org.apache.tez.mapreduce.hadoop.MRInputHelpers
+     *
+     * Update provided localResources collection with the required local
+     * resources needed by MapReduce tasks with respect to Input splits.
+     *
+     * @param fs Filesystem instance to access status of splits related files
+     * @param inputSplitInfo Information on location of split files
+     * @param localResources LocalResources collection to be updated
+     * @throws IOException
+     */
+    public static void updateLocalResourcesForInputSplits(
+        FileSystem fs,
+        InputSplitInfo inputSplitInfo,
+        Map<String, LocalResource> localResources) throws IOException {
+      if (localResources.containsKey(JOB_SPLIT_RESOURCE_NAME)) {
+        throw new RuntimeException("LocalResources already contains a"
+            + " resource named " + JOB_SPLIT_RESOURCE_NAME);
+      }
+      if (localResources.containsKey(JOB_SPLIT_METAINFO_RESOURCE_NAME)) {
+        throw new RuntimeException("LocalResources already contains a"
+            + " resource named " + JOB_SPLIT_METAINFO_RESOURCE_NAME);
+      }
+
+      FileStatus splitFileStatus =
+          fs.getFileStatus(inputSplitInfo.getSplitsFile());
+      FileStatus metaInfoFileStatus =
+          fs.getFileStatus(inputSplitInfo.getSplitsMetaInfoFile());
+      localResources.put(JOB_SPLIT_RESOURCE_NAME,
+          LocalResource.newInstance(
+              
ConverterUtils.getYarnUrlFromPath(inputSplitInfo.getSplitsFile()),
+              LocalResourceType.FILE,
+              LocalResourceVisibility.APPLICATION,
+              splitFileStatus.getLen(), 
splitFileStatus.getModificationTime()));
+      localResources.put(JOB_SPLIT_METAINFO_RESOURCE_NAME,
+          LocalResource.newInstance(
+              ConverterUtils.getYarnUrlFromPath(
+                  inputSplitInfo.getSplitsMetaInfoFile()),
+              LocalResourceType.FILE,
+              LocalResourceVisibility.APPLICATION,
+              metaInfoFileStatus.getLen(),
+              metaInfoFileStatus.getModificationTime()));
+    }
+
 }

Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Fri Mar  6 
21:22:53 2015
@@ -62,6 +62,8 @@ public class TestTezAutoParallelism {
     public static void oneTimeSetUp() throws Exception {
         cluster = 
MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_TEZ);
         properties = cluster.getProperties();
+        //Test spilling to disk as tests here have multiple splits
+        
properties.setProperty(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD, 
"10");
         createFiles();
     }
 


Reply via email to