Author: rohini
Date: Fri Mar 6 21:22:53 2015
New Revision: 1664723
URL: http://svn.apache.org/r1664723
Log:
PIG-4443: Write inputsplits in Tez to disk if the size is huge and option to
compress pig input splits (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/PigConfiguration.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Mar 6 21:22:53 2015
@@ -50,6 +50,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4443: Write inputsplits in Tez to disk if the size is huge and option to
compress pig input splits (rohini)
+
PIG-4447: Pig Cannot handle nullable values (arrays and records) in avro
records (rdsr via daijy)
PIG-4444: Fix unit test failure TestTezAutoParallelism (daijy)
Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Fri Mar 6 21:22:53 2015
@@ -118,6 +118,21 @@ public class PigConfiguration {
*/
public static final String PIG_TEZ_AUTO_PARALLELISM =
"pig.tez.auto.parallelism";
+ /**
+ * This key is used to configure compression for the pig input splits which
+ * are not FileSplit. Default is false
+ */
+ public static final String PIG_COMPRESS_INPUT_SPLITS =
"pig.compress.input.splits";
+ public static final boolean PIG_COMPRESS_INPUT_SPLITS_DEFAULT = false;
+
+ /**
+ * Serialize input splits to disk if the input splits size exceeds a
+ * threshold to avoid hitting default RPC transfer size limit of 64MB.
+ * Default is 33554432 (32MB)
+ */
+ public static final String PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD =
"pig.tez.input.splits.mem.threshold";
+ public static final int PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD_DEFAULT =
33554432;
+
// Pig UDF profiling settings
/**
* Controls whether execution time of Pig UDFs should be tracked.
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
Fri Mar 6 21:22:53 2015
@@ -20,7 +20,9 @@ package org.apache.pig.backend.hadoop.ex
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -30,13 +32,15 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.List;
-import java.util.HashSet;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
-import java.lang.StringBuilder;
+import java.util.Set;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@@ -46,7 +50,10 @@ import org.apache.hadoop.io.serializer.S
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.data.WritableByteArray;
import org.apache.pig.impl.plan.OperatorKey;
/**
@@ -59,19 +66,22 @@ import org.apache.pig.impl.plan.Operator
* wrapped InputSplit.
*/
public class PigSplit extends InputSplit implements Writable, Configurable {
+
+
+ private static String FILESPLIT_CLASSNAME = FileSplit.class.getName();
//The operators to which the tuples from this
//input file are attached. These are the successors
//of the load operator representing this input
private ArrayList<OperatorKey> targetOps;
// index starting from 0 representing the input number
- // So if we have 3 inputs (say for a 3 way join), then the
+ // So if we have 3 inputs (say for a 3 way join), then the
// splits corresponding to the first input will have an index of 0, those
// corresponding to the second will have an index of 1 and so on
// This will be used to get the LoadFunc corresponding to the input
// in PigInputFormat and related code.
private int inputIndex;
-
+
// The real InputSplit this split is wrapping
private InputSplit[] wrappedSplits;
@@ -80,36 +90,36 @@ public class PigSplit extends InputSplit
// This will be used by MergeJoinIndexer to record the split # in the
// index
private int splitIndex;
-
+
// index of current splits being process
private int currentIdx;
-
+
// the flag indicates this is a multi-input join (i.e. join)
- // so that custom Hadoop counters will be created in the
+ // so that custom Hadoop counters will be created in the
// back-end to track the number of records for each input.
private boolean isMultiInputs = false;
-
+
// the flag indicates the custom Hadoop counter should be disabled.
// This is to prevent the number of counters exceeding the limit.
// This flag is controlled by Pig property "pig.disable.counter" (
// the default value is 'false').
private boolean disableCounter = false;
-
+
/**
* the job Configuration
*/
private Configuration conf;
-
+
/**
* total number of splits - required by skew join
*/
private int totalSplits;
-
+
/**
* total length
*/
private long length = -1;
-
+
/**
* overall locations
*/
@@ -118,8 +128,8 @@ public class PigSplit extends InputSplit
// this seems necessary for Hadoop to instatiate this split on the
// backend
public PigSplit() {}
-
- public PigSplit(InputSplit[] wrappedSplits, int inputIndex,
+
+ public PigSplit(InputSplit[] wrappedSplits, int inputIndex,
List<OperatorKey> targetOps, int splitIndex) {
this.wrappedSplits = wrappedSplits;
this.inputIndex = inputIndex;
@@ -127,30 +137,30 @@ public class PigSplit extends InputSplit
this.splitIndex = splitIndex;
this.currentIdx = 0;
}
-
+
public List<OperatorKey> getTargetOps() {
return new ArrayList<OperatorKey>(targetOps);
}
-
+
/**
- * This methods returns the actual InputSplit (as returned by the
+ * This methods returns the actual InputSplit (as returned by the
* {@link InputFormat}) which this class is wrapping.
* @return the wrappedSplit
*/
public InputSplit getWrappedSplit() {
return wrappedSplits[currentIdx];
}
-
+
/**
- *
+ *
* @param idx the index into the wrapped splits
* @return the specified wrapped split
*/
public InputSplit getWrappedSplit(int idx) {
return wrappedSplits[idx];
}
-
+
@Override
@SuppressWarnings("unchecked")
public String[] getLocations() throws IOException, InterruptedException {
@@ -200,7 +210,7 @@ public class PigSplit extends InputSplit
}
return length;
}
-
+
/**
* Return the length of a wrapped split
* @param idx the index into the wrapped splits
@@ -210,6 +220,7 @@ public class PigSplit extends InputSplit
return wrappedSplits[idx].getLength();
}
+ @Override
@SuppressWarnings("unchecked")
public void readFields(DataInput is) throws IOException {
disableCounter = is.readBoolean();
@@ -220,32 +231,59 @@ public class PigSplit extends InputSplit
targetOps = (ArrayList<OperatorKey>) readObject(is);
int splitLen = is.readInt();
int distinctSplitClassCount = is.readInt();
+ boolean nonFileSplit = false;
//construct the input split class name list
+
String[] distinctSplitClassName = new String[distinctSplitClassCount];
for (int i = 0; i < distinctSplitClassCount; i++) {
distinctSplitClassName[i] = is.readUTF();
+ if (!distinctSplitClassName[i].equals(FILESPLIT_CLASSNAME)) {
+ nonFileSplit = true;
+ }
}
try {
SerializationFactory sf = new SerializationFactory(conf);
// The correct call sequence for Deserializer is, we shall open,
then deserialize, but we shall not close
wrappedSplits = new InputSplit[splitLen];
+
+ if (splitLen <= 0) {
+ return;
+ }
+
+ // Do not compress if everything is FileSplit as it does not
compress much
+ // but adds few seconds for 30K+ tasks
+ boolean compress = nonFileSplit && conf.getBoolean(
+ PigConfiguration.PIG_COMPRESS_INPUT_SPLITS,
+ PigConfiguration.PIG_COMPRESS_INPUT_SPLITS_DEFAULT);
+ DataInputStream dis = null;
+ if (compress) {
+ int numBytes = is.readInt();
+ byte[] buf = new byte[numBytes];
+ is.readFully(buf, 0, numBytes);
+ dis = new DataInputStream(new InflaterInputStream(new
ByteArrayInputStream(buf)));
+ }
+ DataInput dataIn = compress ? dis : is;
for (int i = 0; i < splitLen; i++)
{
//read the className index
- int index = is.readInt();
+ int index = dataIn.readInt();
//get the split class name
String splitClassName = distinctSplitClassName[index];
Class splitClass = conf.getClassByName(splitClassName);
Deserializer d = sf.getDeserializer(splitClass);
- d.open((InputStream) is);
+ d.open((InputStream) dataIn);
wrappedSplits[i] =
(InputSplit)ReflectionUtils.newInstance(splitClass, conf);
d.deserialize(wrappedSplits[i]);
}
+ if (compress && splitLen > 0) {
+ dis.close();
+ }
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
+ @Override
@SuppressWarnings("unchecked")
public void write(DataOutput os) throws IOException {
os.writeBoolean(disableCounter);
@@ -262,6 +300,7 @@ public class PigSplit extends InputSplit
}
List<String> distinctSplitClassList = new ArrayList<String>();
distinctSplitClassList.addAll(splitClassNameSet);
+ boolean nonFileSplit = distinctSplitClassList.size() > 1 ||
(!distinctSplitClassList.contains(FILESPLIT_CLASSNAME));
//write the distinct number of split class name
os.writeInt(distinctSplitClassList.size());
//write each classname once
@@ -270,20 +309,43 @@ public class PigSplit extends InputSplit
}
SerializationFactory sf = new SerializationFactory(conf);
+ if (wrappedSplits.length <= 0) {
+ return;
+ }
+
+ boolean compress = nonFileSplit && conf.getBoolean(
+ PigConfiguration.PIG_COMPRESS_INPUT_SPLITS,
+ PigConfiguration.PIG_COMPRESS_INPUT_SPLITS_DEFAULT);
+ WritableByteArray byteStream = null;
+ Deflater deflater = null;
+ DataOutputStream dos = null;
+ if (compress) {
+ byteStream = new WritableByteArray(16384);
+ deflater = new Deflater(Deflater.BEST_COMPRESSION);
+ dos = new DataOutputStream(new DeflaterOutputStream(byteStream,
deflater));
+ }
+ DataOutput dataOut = compress ? dos : os;
for (int i = 0; i < wrappedSplits.length; i++)
{
//find out the index of the split class name
int index =
distinctSplitClassList.indexOf(wrappedSplits[i].getClass().getName());
- os.writeInt(index);
+ dataOut.writeInt(index);
Serializer s = sf.getSerializer(wrappedSplits[i].getClass());
//Checks if Serializer is NULL or not before calling open() method
on it.
if (s == null) {
throw new IllegalArgumentException("Could not find Serializer
for class "+wrappedSplits[i].getClass()+". InputSplits must implement
Writable.");
}
- s.open((OutputStream) os);
+ s.open((OutputStream) dataOut);
// The correct call sequence for Serializer is, we shall open,
then serialize, but we shall not close
s.serialize(wrappedSplits[i]);
}
+ if (compress) {
+ //Get the compressed serialized bytes and write them
+ dos.close();
+ os.writeInt(byteStream.getLength());
+ os.write(byteStream.getData(), 0, byteStream.getLength());
+ deflater.end();
+ }
}
@@ -323,7 +385,7 @@ public class PigSplit extends InputSplit
public void setMultiInputs(boolean b) {
isMultiInputs = b;
}
-
+
/**
* Returns true if the map has multiple inputs, else false
* @return true if the map has multiple inputs, else false
@@ -331,7 +393,7 @@ public class PigSplit extends InputSplit
public boolean isMultiInputs() {
return isMultiInputs;
}
-
+
@Override
public Configuration getConf() {
return conf;
@@ -340,20 +402,20 @@ public class PigSplit extends InputSplit
/** (non-Javadoc)
* @see
org.apache.hadoop.conf.Configurable#setConf(org.apache.hadoop.conf.Configuration)
- *
- * This will be called by
+ *
+ * This will be called by
* {@link PigInputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)}
- * to be used in {@link #write(DataOutput)} for serializing the
+ * to be used in {@link #write(DataOutput)} for serializing the
* wrappedSplit
- *
- * This will be called by Hadoop in the backend to set the right Job
+ *
+ * This will be called by Hadoop in the backend to set the right Job
* Configuration (hadoop will invoke this method because PigSplit
implements
* {@link Configurable} - we need this Configuration in readFields() to
- * deserialize the wrappedSplit
+ * deserialize the wrappedSplit
*/
@Override
public void setConf(Configuration conf) {
- this.conf = conf;
+ this.conf = conf;
}
// package level access because we don't want LoadFunc implementations
@@ -362,9 +424,9 @@ public class PigSplit extends InputSplit
int getInputIndex() {
return inputIndex;
}
-
+
/**
- *
+ *
* @return the number of wrapped splits
*/
public int getNumPaths() {
@@ -402,7 +464,7 @@ public class PigSplit extends InputSplit
wrappedSplits[i].getClass().getName() + "\n
Locations:\n");
for (String location : wrappedSplits[i].getLocations())
st.append(" "+location+"\n");
- st.append("\n-----------------------\n");
+ st.append("\n-----------------------\n");
}
} catch (IOException e) {
return null;
@@ -419,7 +481,7 @@ public class PigSplit extends InputSplit
public boolean disableCounter() {
return disableCounter;
}
-
+
public void setCurrentIdx(int idx) {
this.currentIdx = idx;
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Fri Mar 6 21:22:53 2015
@@ -145,12 +145,16 @@ import org.apache.tez.dag.library.vertex
import org.apache.tez.mapreduce.combine.MRCombiner;
import org.apache.tez.mapreduce.committer.MROutputCommitter;
import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
+import
org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto.Builder;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
@@ -631,21 +635,61 @@ public class TezDagBuilder extends TezOp
+ ", memory=" + vertex.getTaskResource().getMemory()
+ ", java opts=" + vertex.getTaskLaunchCmdOpts()
);
-
// Right now there can only be one of each of these. Will need to be
// more generic when there can be more.
for (POLoad ld : tezOp.getLoaderInfo().getLoads()) {
// TODO: These should get the globalConf, or a merged version that
// keeps settings like pig.maxCombinedSplitSize
-
vertex.setLocationHint(VertexLocationHint.create(tezOp.getLoaderInfo().getInputSplitInfo().getTaskLocationHints()));
+ Builder userPayLoadBuilder =
MRRuntimeProtos.MRInputUserPayloadProto.newBuilder();
+
+ InputSplitInfo inputSplitInfo =
tezOp.getLoaderInfo().getInputSplitInfo();
+ Map<String, LocalResource> additionalLocalResources = null;
+ int spillThreshold = payloadConf
+
.getInt(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD,
+
PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD_DEFAULT);
+
+ // Currently inputSplitInfo is always InputSplitInfoMem at this
point
+ if (inputSplitInfo instanceof InputSplitInfoMem) {
+ MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
+ int splitsSerializedSize = splitsProto.getSerializedSize();
+ if(splitsSerializedSize > spillThreshold) {
+ payloadConf.setBoolean(
+
org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
+ false);
+ // Write splits to disk
+ FileSystem fs = FileSystem.get(payloadConf);
+ Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
+ log.info("Writing input splits to " + inputSplitsDir
+ + " as the serialized size in memory is "
+ + splitsSerializedSize + ". Configured "
+ +
PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD
+ + " is " + spillThreshold);
+ inputSplitInfo = MRToTezHelper.convertToInputSplitInfoDisk(
+ (InputSplitInfoMem)inputSplitInfo, inputSplitsDir,
payloadConf, fs);
+ additionalLocalResources = new HashMap<String,
LocalResource>();
+ MRToTezHelper.updateLocalResourcesForInputSplits(
+ fs, inputSplitInfo,
+ additionalLocalResources);
+ } else {
+ // Send splits via RPC to AM
+ userPayLoadBuilder.setSplits(splitsProto);
+ }
+ //Free up memory
+ tezOp.getLoaderInfo().setInputSplitInfo(null);
+ }
+
userPayLoadBuilder.setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf));
+
+
vertex.setLocationHint(VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()));
vertex.addDataSource(ld.getOperatorKey().toString(),
DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName())
-
.setUserPayload(UserPayload.create(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
-
.setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf))
-
.setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer()))
- .setHistoryText(convertToHistoryText("",
payloadConf)),
-
InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()),
dag.getCredentials()));
+
.setUserPayload(UserPayload.create(userPayLoadBuilder.build().toByteString().asReadOnlyByteBuffer()))
+ .setHistoryText(convertToHistoryText("",
payloadConf)),
+
InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()),
+ inputSplitInfo.getNumTasks(),
+ dag.getCredentials(),
+ null,
+ additionalLocalResources));
}
for (POStore store : stores) {
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
Fri Mar 6 21:22:53 2015
@@ -112,8 +112,8 @@ public class TezLauncher extends Launche
if (pc.getExecType().isLocal()) {
pc.getProperties().setProperty(TezConfiguration.TEZ_LOCAL_MODE,
"true");
pc.getProperties().setProperty(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
"true");
- pc.getProperties().setProperty("tez.ignore.lib.uris", "true");
- pc.getProperties().setProperty("tez.am.dag.scheduler.class",
DAGSchedulerNaturalOrderControlled.class.getName());
+
pc.getProperties().setProperty(TezConfiguration.TEZ_IGNORE_LIB_URIS, "true");
+
pc.getProperties().setProperty(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS,
DAGSchedulerNaturalOrderControlled.class.getName());
}
Configuration conf =
ConfigurationUtil.toConfiguration(pc.getProperties(), true);
if (pc.defaultParallel == -1 &&
!conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true)) {
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
Fri Mar 6 21:22:53 2015
@@ -17,7 +17,9 @@
*/
package org.apache.pig.backend.hadoop.executionengine.tez.util;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -25,30 +27,49 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
@InterfaceAudience.Private
public class MRToTezHelper {
private static final Log LOG = LogFactory.getLog(MRToTezHelper.class);
+ private static final String JOB_SPLIT_RESOURCE_NAME =
MRJobConfig.JOB_SPLIT;
+ private static final String JOB_SPLIT_METAINFO_RESOURCE_NAME =
MRJobConfig.JOB_SPLIT_METAINFO;
private static List<String> mrSettingsToRetain = new ArrayList<String>();
+ private static List<String> mrSettingsToRemove = new ArrayList<String>();
+
private MRToTezHelper() {
}
static {
populateMRSettingsToRetain();
+ populateMRSettingsToRemove();
}
private static void populateMRSettingsToRetain() {
@@ -70,6 +91,28 @@ public class MRToTezHelper {
mrSettingsToRetain.add(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER);
}
+ private static void populateMRSettingsToRemove() {
+
+ // TODO: Add all unwanted MR config once Tez UI starts showing config
+
+ // FileInputFormat.listStatus() on a task can cause job failure when
run from Oozie
+ mrSettingsToRemove.add(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
+ }
+
+ private static void removeUnwantedMRSettings(Configuration tezConf) {
+
+ Iterator<Entry<String, String>> iter = tezConf.iterator();
+ while (iter.hasNext()) {
+ Entry<String, String> next = iter.next();
+ for (String mrSetting : mrSettingsToRemove) {
+ if (next.getKey().equals(mrSetting)) {
+ iter.remove();
+ break;
+ }
+ }
+ }
+ }
+
public static TezConfiguration getDAGAMConfFromMRConf(
Configuration tezConf) {
@@ -132,7 +175,12 @@ public class MRToTezHelper {
tezConf.get(MRConfiguration.JOB_CREDENTIALS_BINARY));
}
- //TODO: Strip out all MR settings
+ if (tezConf.get(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN) != null) {
+
dagAMConf.setIfUnset(TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION,
+ tezConf.get(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN));
+ }
+
+ removeUnwantedMRSettings(dagAMConf);
return dagAMConf;
}
@@ -154,6 +202,7 @@ public class MRToTezHelper {
}
JobControlCompiler.configureCompression(conf);
convertMRToTezRuntimeConf(conf, mrConf);
+ removeUnwantedMRSettings(conf);
}
/**
@@ -174,4 +223,67 @@ public class MRToTezHelper {
}
}
+ /**
+ * Write input splits (job.split and job.splitmetainfo) to disk
+ */
+ public static InputSplitInfoDisk convertToInputSplitInfoDisk(
+ InputSplitInfoMem infoMem, Path inputSplitsDir, JobConf jobConf,
+ FileSystem fs) throws IOException, InterruptedException {
+ LOG.info("Generating new input splits" + ", splitsDir="
+ + inputSplitsDir.toString());
+
+ InputSplit[] splits = infoMem.getNewFormatSplits();
+ JobSplitWriter.createSplitFiles(inputSplitsDir, jobConf, fs, splits);
+
+ return new InputSplitInfoDisk(
+ JobSubmissionFiles.getJobSplitFile(inputSplitsDir),
+ JobSubmissionFiles.getJobSplitMetaFile(inputSplitsDir),
+ splits.length, infoMem.getTaskLocationHints(),
+ jobConf.getCredentials());
+ }
+
+ /**
+ * Exact copy of private method from from
org.apache.tez.mapreduce.hadoop.MRInputHelpers
+ *
+ * Update provided localResources collection with the required local
+ * resources needed by MapReduce tasks with respect to Input splits.
+ *
+ * @param fs Filesystem instance to access status of splits related files
+ * @param inputSplitInfo Information on location of split files
+ * @param localResources LocalResources collection to be updated
+ * @throws IOException
+ */
+ public static void updateLocalResourcesForInputSplits(
+ FileSystem fs,
+ InputSplitInfo inputSplitInfo,
+ Map<String, LocalResource> localResources) throws IOException {
+ if (localResources.containsKey(JOB_SPLIT_RESOURCE_NAME)) {
+ throw new RuntimeException("LocalResources already contains a"
+ + " resource named " + JOB_SPLIT_RESOURCE_NAME);
+ }
+ if (localResources.containsKey(JOB_SPLIT_METAINFO_RESOURCE_NAME)) {
+ throw new RuntimeException("LocalResources already contains a"
+ + " resource named " + JOB_SPLIT_METAINFO_RESOURCE_NAME);
+ }
+
+ FileStatus splitFileStatus =
+ fs.getFileStatus(inputSplitInfo.getSplitsFile());
+ FileStatus metaInfoFileStatus =
+ fs.getFileStatus(inputSplitInfo.getSplitsMetaInfoFile());
+ localResources.put(JOB_SPLIT_RESOURCE_NAME,
+ LocalResource.newInstance(
+
ConverterUtils.getYarnUrlFromPath(inputSplitInfo.getSplitsFile()),
+ LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION,
+ splitFileStatus.getLen(),
splitFileStatus.getModificationTime()));
+ localResources.put(JOB_SPLIT_METAINFO_RESOURCE_NAME,
+ LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(
+ inputSplitInfo.getSplitsMetaInfoFile()),
+ LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION,
+ metaInfoFileStatus.getLen(),
+ metaInfoFileStatus.getModificationTime()));
+ }
+
}
Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1664723&r1=1664722&r2=1664723&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Fri Mar 6
21:22:53 2015
@@ -62,6 +62,8 @@ public class TestTezAutoParallelism {
public static void oneTimeSetUp() throws Exception {
cluster =
MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_TEZ);
properties = cluster.getProperties();
+ //Test spilling to disk as tests here have multiple splits
+
properties.setProperty(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD,
"10");
createFiles();
}