Author: rohini
Date: Wed Mar 11 05:14:02 2015
New Revision: 1665762
URL: http://svn.apache.org/r1665762
Log:
PIG-4443: Additional patch to fix wrong split being read in case of TEZ-2192
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1665762&r1=1665761&r2=1665762&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Wed Mar 11 05:14:02 2015
@@ -22,16 +22,20 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
@@ -43,6 +47,9 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.StoreFuncInterface;
@@ -173,7 +180,9 @@ public class TezDagBuilder extends TezOp
private Map<String, LocalResource> localResources;
private PigContext pc;
private Configuration globalConf;
+ private FileSystem fs;
private long intermediateTaskInputSize;
+ private Set<String> inputSplitInDiskVertices;
public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
Map<String, LocalResource> localResources) {
@@ -182,6 +191,7 @@ public class TezDagBuilder extends TezOp
this.globalConf =
ConfigurationUtil.toConfiguration(pc.getProperties(), true);
this.localResources = localResources;
this.dag = dag;
+ this.inputSplitInDiskVertices = new HashSet<String>();
try {
// Add credentials from binary token file and get tokens for
namenodes
@@ -192,7 +202,8 @@ public class TezDagBuilder extends TezOp
}
try {
- intermediateTaskInputSize =
HadoopShims.getDefaultBlockSize(FileSystem.get(globalConf),
FileLocalizer.getTemporaryResourcePath(pc));
+ fs = FileSystem.get(globalConf);
+ intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs,
FileLocalizer.getTemporaryResourcePath(pc));
} catch (Exception e) {
log.warn("Unable to get the block size for temporary directory,
defaulting to 128MB", e);
intermediateTaskInputSize = 134217728L;
@@ -205,6 +216,40 @@ public class TezDagBuilder extends TezOp
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
}
+ // Hack to turn off relocalization till TEZ-2192 is fixed.
+ public void avoidContainerReuseIfInputSplitInDisk() throws IOException {
+ if (!inputSplitInDiskVertices.isEmpty()) {
+ // Create empty job.split file and add as resource to all other
+ // vertices that are not reading splits from disk so that their
+ // containers are not reused by vertices that read splits from disk
+ Path jobSplitFile = new Path(FileLocalizer.getTemporaryPath(pc),
+ MRJobConfig.JOB_SPLIT);
+ FSDataOutputStream out = fs.create(jobSplitFile);
+ out.close();
+ log.info("Creating empty job.split in " + jobSplitFile);
+ FileStatus splitFileStatus = fs.getFileStatus(jobSplitFile);
+ LocalResource localResource = LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(jobSplitFile),
+ LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION,
+ splitFileStatus.getLen(),
+ splitFileStatus.getModificationTime());
+ for (Vertex vertex : dag.getVertices()) {
+ if (!inputSplitInDiskVertices.contains(vertex.getName())) {
+ if (vertex.getTaskLocalFiles().containsKey(
+ MRJobConfig.JOB_SPLIT)) {
+ throw new RuntimeException(
+ "LocalResources already contains a"
+ + " resource named "
+ + MRJobConfig.JOB_SPLIT);
+ }
+ vertex.getTaskLocalFiles().put(MRJobConfig.JOB_SPLIT,
+ localResource);
+ }
+ }
+ }
+ }
+
@Override
public void visitTezOp(TezOperator tezOp) throws VisitorException {
TezOperPlan tezPlan = getPlan();
@@ -658,19 +703,20 @@ public class TezDagBuilder extends TezOp
org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
false);
// Write splits to disk
- FileSystem fs = FileSystem.get(payloadConf);
Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
log.info("Writing input splits to " + inputSplitsDir
+ + " for vertex " + vertex.getName()
+ " as the serialized size in memory is "
+ splitsSerializedSize + ". Configured "
+
PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD
+ " is " + spillThreshold);
- inputSplitInfo = MRToTezHelper.convertToInputSplitInfoDisk(
+ inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk(
(InputSplitInfoMem)inputSplitInfo, inputSplitsDir,
payloadConf, fs);
additionalLocalResources = new HashMap<String,
LocalResource>();
MRToTezHelper.updateLocalResourcesForInputSplits(
fs, inputSplitInfo,
additionalLocalResources);
+ inputSplitInDiskVertices.add(vertex.getName());
} else {
// Send splits via RPC to AM
userPayLoadBuilder.setSplits(splitsProto);
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1665762&r1=1665761&r2=1665762&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
Wed Mar 11 05:14:02 2015
@@ -60,6 +60,7 @@ public class TezJobCompiler {
tezDag.setCredentials(tezPlanNode.getTezOperPlan().getCredentials());
TezDagBuilder dagBuilder = new TezDagBuilder(pigContext,
tezPlanNode.getTezOperPlan(), tezDag, localResources);
dagBuilder.visit();
+ dagBuilder.avoidContainerReuseIfInputSplitInDisk();
return tezDag;
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1665762&r1=1665761&r2=1665762&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
Wed Mar 11 05:14:02 2015
@@ -226,11 +226,9 @@ public class MRToTezHelper {
/**
* Write input splits (job.split and job.splitmetainfo) to disk
*/
- public static InputSplitInfoDisk convertToInputSplitInfoDisk(
+ public static InputSplitInfoDisk writeInputSplitInfoToDisk(
InputSplitInfoMem infoMem, Path inputSplitsDir, JobConf jobConf,
FileSystem fs) throws IOException, InterruptedException {
- LOG.info("Generating new input splits" + ", splitsDir="
- + inputSplitsDir.toString());
InputSplit[] splits = infoMem.getNewFormatSplits();
JobSplitWriter.createSplitFiles(inputSplitsDir, jobConf, fs, splits);