TEZ-3408. Allow Task Output Files to reside in DAG specific directories for Custom Shuffle Handler (Kuhu Shukla via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0e1d2774 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0e1d2774 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0e1d2774 Branch: refs/heads/master Commit: 0e1d2774307b196a9e763d52327570d8306630cb Parents: 53ea6f5 Author: Jonathan Eagles <[email protected]> Authored: Fri Aug 12 16:45:26 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Fri Aug 12 16:45:26 2016 -0500 ---------------------------------------------------------------------- TEZ-3334-CHANGES.txt | 1 + .../processor/map/TestMapProcessor.java | 4 +- .../apache/tez/auxservices/ShuffleHandler.java | 60 ++++++++++----- .../tez/auxservices/TestShuffleHandler.java | 78 ++++++++++++-------- .../tez/runtime/library/common/Constants.java | 1 + .../runtime/library/common/TezRuntimeUtils.java | 12 ++- .../runtime/library/common/shuffle/Fetcher.java | 14 +++- .../library/common/shuffle/ShuffleUtils.java | 6 ++ .../impl/SimpleFetchedInputAllocator.java | 9 ++- .../orderedgrouped/FetcherOrderedGrouped.java | 14 +++- .../shuffle/orderedgrouped/MergeManager.java | 6 +- .../common/task/local/output/TezTaskOutput.java | 6 +- .../task/local/output/TezTaskOutputFiles.java | 28 ++++--- .../runtime/library/input/UnorderedKVInput.java | 4 +- .../impl/TestSimpleFetchedInputAllocator.java | 5 +- .../TestUnorderedPartitionedKVWriter.java | 9 ++- 16 files changed, 175 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/TEZ-3334-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt index 2122cca..749edfe 100644 --- a/TEZ-3334-CHANGES.txt +++ b/TEZ-3334-CHANGES.txt @@ -4,6 +4,7 @@ Apache Tez Change Log INCOMPATIBLE CHANGES: ALL CHANGES: + TEZ-3408. Allow Task Output Files to reside in DAG specific directories for Custom Shuffle Handler TEZ-3238. Shuffle service name should be configureable and should not be hardcoded to âmapreduce_shuffleâ TEZ-3390. Package Shuffle Handler as a shaded uber-jar TEZ-3378. Move Shuffle Handler configuration into the Tez namespace http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java index 70f8763..b8f989c 100644 --- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java +++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java @@ -154,7 +154,9 @@ public class TestMapProcessor { task.close(); OutputContext outputContext = task.getOutputContexts().iterator().next(); - TezTaskOutput mapOutputs = new TezTaskOutputFiles(jobConf, outputContext.getUniqueIdentifier()); + TezTaskOutput mapOutputs = new TezTaskOutputFiles( + jobConf, outputContext.getUniqueIdentifier(), + outputContext.getDagIdentifier()); // TODO NEWTEZ FIXME OutputCommitter verification http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java index af50cbf..d11dd2c 100644 --- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java @@ -306,14 +306,16 @@ public class ShuffleHandler extends AuxiliaryService { private String user; private Map<String, Shuffle.MapOutputInfo> infoMap; private String jobId; + private String dagId; public ReduceContext(List<String> mapIds, int rId, ChannelHandlerContext context, String usr, Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap, - String jobId) { + String jobId, String dagId) { this.mapIds = mapIds; this.reduceId = rId; + this.dagId = dagId; /** * Atomic count for tracking the no. of map outputs that are yet to * complete. Multiple futureListeners' operationComplete() can decrement @@ -828,7 +830,7 @@ public class ShuffleHandler extends AuxiliaryService { @Override public AttemptPathInfo load(AttemptPathIdentifier key) throws Exception { - String base = getBaseLocation(key.jobId, key.user); + String base = getBaseLocation(key.jobId, key.dagId, key.user); String attemptBase = base + key.attemptId; Path indexFileName = lDirAlloc.getLocalPathToRead( attemptBase + "/" + INDEX_FILE_NAME, conf); @@ -907,16 +909,18 @@ public class ShuffleHandler extends AuxiliaryService { final List<String> mapIds = splitMaps(q.get("map")); final List<String> reduceQ = q.get("reduce"); final List<String> jobQ = q.get("job"); + final List<String> dagIdQ = q.get("dag"); if (LOG.isDebugEnabled()) { LOG.debug("RECV: " + request.getUri() + "\n mapId: " + mapIds + "\n reduceId: " + reduceQ + "\n jobId: " + jobQ + + "\n dagId: " + dagIdQ + "\n keepAlive: " + keepAliveParam); } - if (mapIds == null || reduceQ == null || jobQ == null) { - sendError(ctx, "Required param job, map and reduce", BAD_REQUEST); + if (mapIds == null || reduceQ == null || jobQ == null || dagIdQ == null) { + sendError(ctx, "Required param job, dag, map and reduce", BAD_REQUEST); return; } if (reduceQ.size() != 1 || jobQ.size() != 1) { @@ -933,9 +937,11 @@ public class ShuffleHandler extends AuxiliaryService { } int reduceId; String jobId; + String dagId; try { reduceId = Integer.parseInt(reduceQ.get(0)); jobId = jobQ.get(0); + dagId = dagIdQ.get(0); } catch (NumberFormatException e) { sendError(ctx, "Bad reduce parameter", BAD_REQUEST); return; @@ -965,7 +971,7 @@ public class ShuffleHandler extends AuxiliaryService { String user = userRsrc.get(jobId); try { - populateHeaders(mapIds, jobId, user, reduceId, request, + populateHeaders(mapIds, jobId, dagId, user, reduceId, request, response, keepAliveParam, mapOutputInfoMap); } catch(IOException e) { ch.write(response); @@ -977,7 +983,7 @@ public class ShuffleHandler extends AuxiliaryService { ch.write(response); //Initialize one ReduceContext object per messageReceived call ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx, - user, mapOutputInfoMap, jobId); + user, mapOutputInfoMap, jobId, dagId); for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) { ChannelFuture nextMap = sendMap(reduceContext); if(nextMap == null) { @@ -1008,8 +1014,9 @@ public class ShuffleHandler extends AuxiliaryService { try { MapOutputInfo info = reduceContext.getInfoMap().get(mapId); if (info == null) { - info = getMapOutputInfo(mapId, reduceContext.getReduceId(), - reduceContext.getJobId(), reduceContext.getUser()); + info = getMapOutputInfo(reduceContext.dagId, mapId, + reduceContext.getReduceId(), reduceContext.getJobId(), + reduceContext.getUser()); } nextMap = sendMapOutput( reduceContext.getCtx(), @@ -1041,7 +1048,7 @@ public class ShuffleHandler extends AuxiliaryService { return sb.toString(); } - private String getBaseLocation(String jobId, String user) { + private String getBaseLocation(String jobId, String dagId, String user) { final JobID jobID = JobID.forName(jobId); final ApplicationId appID = ApplicationId.newInstance(Long.parseLong(jobID.getJtIdentifier()), @@ -1049,16 +1056,17 @@ public class ShuffleHandler extends AuxiliaryService { final String baseStr = USERCACHE + "/" + user + "/" + APPCACHE + "/" - + appID.toString() + "/output" + "/"; + + appID.toString() + "/dag_" + dagId + "/output" + "/"; return baseStr; } - protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, - String jobId, String user) throws IOException { + protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, + int reduce, String jobId, + String user) throws IOException { AttemptPathInfo pathInfo; try { AttemptPathIdentifier identifier = new AttemptPathIdentifier( - jobId, user, mapId); + jobId, dagId, user, mapId); pathInfo = pathCache.get(identifier); if (LOG.isDebugEnabled()) { LOG.debug("Retrieved pathInfo for " + identifier + @@ -1087,13 +1095,17 @@ public class ShuffleHandler extends AuxiliaryService { } protected void populateHeaders(List<String> mapIds, String jobId, - String user, int reduce, HttpRequest request, HttpResponse response, - boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap) + String dagId, String user, + int reduce, HttpRequest request, + HttpResponse response, + boolean keepAliveParam, + Map<String, MapOutputInfo> mapOutputInfoMap) throws IOException { long contentLength = 0; for (String mapId : mapIds) { - MapOutputInfo outputInfo = getMapOutputInfo(mapId, reduce, jobId, user); + MapOutputInfo outputInfo = + getMapOutputInfo(dagId, mapId, reduce, jobId, user); if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) { mapOutputInfoMap.put(mapId, outputInfo); } @@ -1292,11 +1304,14 @@ public class ShuffleHandler extends AuxiliaryService { static class AttemptPathIdentifier { private final String jobId; + private final String dagId; private final String user; private final String attemptId; - public AttemptPathIdentifier(String jobId, String user, String attemptId) { + public AttemptPathIdentifier(String jobId, String dagID, String user, + String attemptId) { this.jobId = jobId; + this.dagId = dagID; this.user = user; this.attemptId = attemptId; } @@ -1315,6 +1330,10 @@ public class ShuffleHandler extends AuxiliaryService { if (!attemptId.equals(that.attemptId)) { return false; } + if (dagId != that.dagId) { + return false; + } + if (!jobId.equals(that.jobId)) { return false; } @@ -1325,6 +1344,7 @@ public class ShuffleHandler extends AuxiliaryService { @Override public int hashCode() { int result = jobId.hashCode(); + result = 31 * result + dagId.hashCode(); result = 31 * result + attemptId.hashCode(); return result; } @@ -1332,8 +1352,10 @@ public class ShuffleHandler extends AuxiliaryService { @Override public String toString() { return "AttemptPathIdentifier{" + - "attemptId='" + attemptId + '\'' + - ", jobId='" + jobId + '\'' + + "jobId='" + jobId + '\'' + + ", dagId=" + dagId + + ", user='" + user + '\'' + + ", attemptId='" + attemptId + '\'' + '}'; } } http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java index c2bf361..31e32b4 100644 --- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java +++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java @@ -99,7 +99,6 @@ import org.slf4j.LoggerFactory; public class TestShuffleHandler { static final long MiB = 1024 * 1024; private static final Logger LOG = LoggerFactory.getLogger(TestShuffleHandler.class); - class MockShuffleHandler extends org.apache.tez.auxservices.ShuffleHandler { @Override protected Shuffle getShuffle(final Configuration conf) { @@ -110,15 +109,19 @@ public class TestShuffleHandler { throws IOException { } @Override - protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, - String jobId, String user) throws IOException { + protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, + int reduce, String jobId, + String user) + throws IOException { // Do nothing. return null; } @Override protected void populateHeaders(List<String> mapIds, String jobId, - String user, int reduce, HttpRequest request, - HttpResponse response, boolean keepAliveParam, + String dagId, String user, int reduce, + HttpRequest request, + HttpResponse response, + boolean keepAliveParam, Map<String, MapOutputInfo> infoMap) throws IOException { // Do nothing. } @@ -232,14 +235,18 @@ public class TestShuffleHandler { // replace the shuffle handler with one stubbed for testing return new Shuffle(conf) { @Override - protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, - String jobId, String user) throws IOException { + protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, + int reduce, String jobId, + String user) + throws IOException { return null; } @Override protected void populateHeaders(List<String> mapIds, String jobId, - String user, int reduce, HttpRequest request, - HttpResponse response, boolean keepAliveParam, + String dagId, String user, int reduce, + HttpRequest request, + HttpResponse response, + boolean keepAliveParam, Map<String, MapOutputInfo> infoMap) throws IOException { // Only set response headers and skip everything else // send some dummy value for content-length @@ -294,7 +301,7 @@ public class TestShuffleHandler { // then closing the connection URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) - + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0"); + + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0"); HttpURLConnection conn = (HttpURLConnection)url.openConnection(); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); @@ -327,8 +334,10 @@ public class TestShuffleHandler { // replace the shuffle handler with one stubbed for testing return new Shuffle(conf) { @Override - protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, - String jobId, String user) throws IOException { + protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, + int reduce, + String jobId, String user) + throws IOException { return null; } @Override @@ -339,9 +348,12 @@ public class TestShuffleHandler { @Override protected void populateHeaders(List<String> mapIds, String jobId, - String user, int reduce, HttpRequest request, - HttpResponse response, boolean keepAliveParam, - Map<String, MapOutputInfo> infoMap) throws IOException { + String dagId, String user, + int reduce, HttpRequest request, + HttpResponse response, + boolean keepAliveParam, + Map<String, MapOutputInfo> infoMap) + throws IOException { // Send some dummy data (populate content length details) ShuffleHeader header = new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); @@ -409,7 +421,7 @@ public class TestShuffleHandler { + shuffleHandler.getConfig().get( ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); URL url = - new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&" + new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&" + "map=attempt_12345_1_m_1_0"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, @@ -429,7 +441,7 @@ public class TestShuffleHandler { // For keepAlive via URL url = - new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&" + new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&" + "map=attempt_12345_1_m_1_0&keepAlive=true"); conn = (HttpURLConnection) url.openConnection(); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, @@ -465,7 +477,7 @@ public class TestShuffleHandler { + shuffleHandler.getConfig().get( ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY); URL url = - new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&" + new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&" + "map=attempt_12345_1_m_1_0"); conn = (HttpURLConnection) url.openConnection(); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, @@ -503,7 +515,7 @@ public class TestShuffleHandler { // then closing the connection URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) - + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0"); + + "/mapOutput?job=job_12345_1&&dag=1reduce=1&map=attempt_12345_1_m_1_0"); for (int i = 0; i < failureNum; ++i) { HttpURLConnection conn = (HttpURLConnection)url.openConnection(); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, @@ -536,15 +548,19 @@ public class TestShuffleHandler { // replace the shuffle handler with one stubbed for testing return new Shuffle(conf) { @Override - protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, - String jobId, String user) throws IOException { + protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, + int reduce, String jobId, + String user) + throws IOException { // Do nothing. return null; } @Override protected void populateHeaders(List<String> mapIds, String jobId, - String user, int reduce, HttpRequest request, - HttpResponse response, boolean keepAliveParam, + String dagId, String user, int reduce, + HttpRequest request, + HttpResponse response, + boolean keepAliveParam, Map<String, MapOutputInfo> infoMap) throws IOException { // Do nothing. } @@ -585,7 +601,7 @@ public class TestShuffleHandler { for (int i = 0; i < connAttempts; i++) { String URLstring = "http://127.0.0.1:" + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) - + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_" + + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_" + i + "_0"; URL url = new URL(URLstring); conns[i] = (HttpURLConnection)url.openConnection(); @@ -685,7 +701,7 @@ public class TestShuffleHandler { "http://127.0.0.1:" + shuffleHandler.getConfig().get( ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) - + "/mapOutput?job=job_12345_0001&reduce=" + reducerId + + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerId + "&map=attempt_12345_1_m_1_0"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, @@ -722,7 +738,8 @@ public class TestShuffleHandler { StringUtils.join(Path.SEPARATOR, new String[] { logDir.getAbsolutePath(), ShuffleHandler.USERCACHE, user, - ShuffleHandler.APPCACHE, appId, "output", appAttemptId }); + ShuffleHandler.APPCACHE, appId,"dag_1/" + "output", + appAttemptId }); File appAttemptDir = new File(attemptDir); appAttemptDir.mkdirs(); System.out.println(appAttemptDir.getAbsolutePath()); @@ -924,7 +941,8 @@ public class TestShuffleHandler { Token<JobTokenIdentifier> jt) throws IOException { URL url = new URL("http://127.0.0.1:" + shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) - + "/mapOutput?job=job_12345_0001&reduce=0&map=attempt_12345_1_m_1_0"); + + "/mapOutput?job=job_12345_0001&dag=1&reduce=0" + + "&map=attempt_12345_1_m_1_0"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); String encHash = SecureShuffleUtils.hashFromString( SecureShuffleUtils.buildMsgFrom(url), @@ -967,7 +985,7 @@ public class TestShuffleHandler { return new Shuffle(conf) { @Override protected void populateHeaders(List<String> mapIds, - String outputBaseStr, String user, int reduce, + String outputBaseStr, String dagId, String user, int reduce, HttpRequest request, HttpResponse response, boolean keepAliveParam, Map<String, MapOutputInfo> infoMap) throws IOException { @@ -1021,7 +1039,7 @@ public class TestShuffleHandler { "http://127.0.0.1:" + shuffleHandler.getConfig().get( ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) - + "/mapOutput?job=job_12345_0001&reduce=" + reducerId + + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerId + "&map=attempt_12345_1_m_1_0"); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, @@ -1116,7 +1134,7 @@ public class TestShuffleHandler { Mockito.doAnswer(new Answer() { @Override public Object answer(InvocationOnMock invocation) throws Throwable { - String uri = "/mapOutput?job=job_12345_1&reduce=1"; + String uri = "/mapOutput?job=job_12345_1&dag=1&reduce=1"; for (int i = 0; i < 100; i++) uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0"); return uri; http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java index 827cafe..81921b2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java @@ -25,6 +25,7 @@ public class Constants { // TODO NEWTEZ Check which of these constants are expecting specific pieces of information which are being removed - like taskAttemptId public static final String TEZ = "tez"; + public static final String DAG_PREFIX = "dag_"; public static final String MAP_OUTPUT_FILENAME_STRING = "file.out"; public static final String MAP_OUTPUT_INDEX_SUFFIX_STRING = ".index"; http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java index 819423f..c0b7210 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java @@ -139,14 +139,18 @@ public class TezRuntimeUtils { } return partitioner; } - - public static TezTaskOutput instantiateTaskOutputManager(Configuration conf, OutputContext outputContext) { + + public static TezTaskOutput instantiateTaskOutputManager( + Configuration conf, OutputContext outputContext) { Class<?> clazz = conf.getClass(Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER, TezTaskOutputFiles.class); try { - Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class); + Constructor<?> ctor = clazz.getConstructor(Configuration.class, String + .class, int.class); ctor.setAccessible(true); - TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, outputContext.getUniqueIdentifier()); + TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, + outputContext.getUniqueIdentifier(), + outputContext.getDagIdentifier()); return instance; } catch (Exception e) { throw new TezUncheckedException( http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 6cbff94..896f532 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -666,10 +666,16 @@ public class Fetcher extends CallableWithNdc<FetchResult> { return idxRecord; } - private static final String getMapOutputFile(String pathComponent) { - return Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR - + pathComponent + Path.SEPARATOR - + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING; + private final String getMapOutputFile(String pathComponent) { + String outputPath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR + + pathComponent + Path.SEPARATOR + + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING; + + if(ShuffleUtils.isTezShuffleHandler(conf)) { + return Constants.DAG_PREFIX + this.dagIdentifier + Path.SEPARATOR + + outputPath; + } + return outputPath; } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index fa8533c..5d2444c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -620,5 +620,11 @@ public class ShuffleUtils { sslFactory); return httpConnParams; } + + public static boolean isTezShuffleHandler(Configuration config) { + return config.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, + TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT). + contains("tez"); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java index 68c4781..f939cd1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/SimpleFetchedInputAllocator.java @@ -65,15 +65,18 @@ public class SimpleFetchedInputAllocator implements FetchedInputAllocator, private volatile long usedMemory = 0; - public SimpleFetchedInputAllocator(String srcNameTrimmed, String uniqueIdentifier, Configuration conf, - long maxTaskAvailableMemory, long memoryAvailable) { + public SimpleFetchedInputAllocator(String srcNameTrimmed, + String uniqueIdentifier, int dagID, + Configuration conf, + long maxTaskAvailableMemory, + long memoryAvailable) { this.srcNameTrimmed = srcNameTrimmed; this.conf = conf; this.maxAvailableTaskMemory = maxTaskAvailableMemory; this.initialMemoryAvailable = memoryAvailable; this.fileNameAllocator = new TezTaskOutputFiles(conf, - uniqueIdentifier); + uniqueIdentifier, dagID); this.localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS); // Setup configuration http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index bcb75d2..b6599dc 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -717,9 +717,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { throws IOException { LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS); suffix = suffix != null ? suffix : ""; - - String pathFromLocalDir = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR + - pathComponent + Path.SEPARATOR + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix; + String outputPath = Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + Path.SEPARATOR + + pathComponent + Path.SEPARATOR + + Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + suffix; + String pathFromLocalDir = getPathForLocalDir(outputPath); return localDirAllocator.getLocalPathToRead(pathFromLocalDir.toString(), conf); } @@ -750,5 +751,12 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { remaining.put(id.toString(), id); } } + + private String getPathForLocalDir(String suffix) { + if(ShuffleUtils.isTezShuffleHandler(conf)) { + return Constants.DAG_PREFIX + dagId + Path.SEPARATOR + suffix; + } + return suffix; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 26bdca7..a6f554c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -177,8 +177,10 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped { this.reduceCombineInputCounter = reduceCombineInputCounter; this.spilledRecordsCounter = spilledRecordsCounter; this.mergedMapOutputsCounter = mergedMapOutputsCounter; - this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier()); - + this.mapOutputFile = new TezTaskOutputFiles(conf, + inputContext.getUniqueIdentifier(), + inputContext.getDagIdentifier()); + this.localFS = localFS; this.rfs = ((LocalFileSystem)localFS).getRaw(); http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java index c41e4a6..414f3d0 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.tez.runtime.library.common.Constants; /** * Manipulate the working area for the transient store for components in tez-runtime-library @@ -37,6 +38,7 @@ public abstract class TezTaskOutput { protected final Configuration conf; protected final String uniqueId; + protected final String dagId; /** * @param conf the configuration from which local-dirs will be picked up @@ -45,10 +47,12 @@ public abstract class TezTaskOutput { * container is used for multiple tasks, this id should be unique for inputs / * outputs spanning across tasks. This is also expected to be unique across all * tasks for a vertex. + * @param dagID DAG identifier for the specific job */ - public TezTaskOutput(Configuration conf, String uniqueId) { + public TezTaskOutput(Configuration conf, String uniqueId, int dagID) { this.conf = conf; this.uniqueId = uniqueId; + this.dagId = Constants.DAG_PREFIX + dagID + Path.SEPARATOR; } /** http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java index 1e6fca3..97a2509 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.runtime.library.common.Constants; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; /** * Manipulate the working area for the transient store for components in tez-runtime-library @@ -40,9 +41,9 @@ import org.apache.tez.runtime.library.common.Constants; @InterfaceAudience.Private @InterfaceStability.Unstable public class TezTaskOutputFiles extends TezTaskOutput { - - public TezTaskOutputFiles(Configuration conf, String uniqueId) { - super(conf, uniqueId); + + public TezTaskOutputFiles(Configuration conf, String uniqueId, int dagID) { + super(conf, uniqueId, dagID); } private static final Logger LOG = LoggerFactory.getLogger(TezTaskOutputFiles.class); @@ -60,7 +61,8 @@ public class TezTaskOutputFiles extends TezTaskOutput { new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS); /* - * ${appDir}/output/${uniqueId} + * if service_id = mapreduce_shuffle then "${appDir}/output/${uniqueId}" + * if service_id = tez_shuffle then "${appDir}/dagId/output/${uniqueId}" */ private Path getAttemptOutputDir() { if (LOG.isDebugEnabled()) { @@ -68,7 +70,8 @@ public class TezTaskOutputFiles extends TezTaskOutput { + Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR + "/" + uniqueId); } - return new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, uniqueId); + String dagPath = getDagOutputDir(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR); + return new Path(dagPath, uniqueId); } @@ -201,7 +204,8 @@ public class TezTaskOutputFiles extends TezTaskOutput { public Path getSpillFileForWrite(int spillNumber, long size) throws IOException { Preconditions.checkArgument(spillNumber >= 0, "Provide a valid spill number " + spillNumber); - Path taskAttemptDir = new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, + String dagPath = getDagOutputDir(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR); + Path taskAttemptDir = new Path(dagPath, String.format(SPILL_FILE_DIR_PATTERN, uniqueId, spillNumber)); Path outputDir = new Path(taskAttemptDir, Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING); return lDirAlloc.getLocalPathForWrite(outputDir.toString(), size, conf); @@ -222,8 +226,9 @@ public class TezTaskOutputFiles extends TezTaskOutput { public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOException { Preconditions.checkArgument(spillNumber >= 0, "Provide a valid spill number " + spillNumber); - Path taskAttemptDir = new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, - String.format(SPILL_FILE_DIR_PATTERN, uniqueId, spillNumber)); + String dagPath = getDagOutputDir(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR); + Path taskAttemptDir = new Path(dagPath, String.format( + SPILL_FILE_DIR_PATTERN, uniqueId, spillNumber)); Path outputDir = new Path(taskAttemptDir, Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING); return lDirAlloc.getLocalPathForWrite(outputDir.toString(), size, conf); @@ -247,7 +252,8 @@ public class TezTaskOutputFiles extends TezTaskOutput { @Override public Path getInputFileForWrite(int srcIdentifier, int spillNum, long size) throws IOException { - return lDirAlloc.getLocalPathForWrite(getSpillFileName(srcIdentifier, spillNum), size, conf); + String dagPath = getDagOutputDir(getSpillFileName(srcIdentifier, spillNum)); + return lDirAlloc.getLocalPathForWrite(dagPath, size, conf); } /** @@ -265,4 +271,8 @@ public class TezTaskOutputFiles extends TezTaskOutput { public String getSpillFileName(int srcId, int spillNum) { return String.format(SPILL_FILE_PATTERN, uniqueId, srcId, spillNum); } + + private String getDagOutputDir(String child) { + return ShuffleUtils.isTezShuffleHandler(conf) ? dagId.concat(child) : child; + } } http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java index ec9a191..2d6683a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java @@ -134,7 +134,9 @@ public class UnorderedKVInput extends AbstractLogicalInput { TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT); this.inputManager = new SimpleFetchedInputAllocator( - TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()), getContext().getUniqueIdentifier(), conf, + TezUtilsInternal.cleanVertexName(getContext().getSourceVertexName()), + getContext().getUniqueIdentifier(), + getContext().getDagIdentifier(), conf, getContext().getTotalMemoryAvailableToTask(), memoryUpdateCallbackHandler.getMemoryAssigned()); http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java index 2f89b0f..1b63b17 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestSimpleFetchedInputAllocator.java @@ -52,8 +52,9 @@ public class TestSimpleFetchedInputAllocator { long inMemThreshold = (long) (bufferPercent * jvmMax); LOG.info("InMemThreshold: " + inMemThreshold); - SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator("srcName", UUID.randomUUID().toString(), - conf, Runtime.getRuntime().maxMemory(), inMemThreshold); + SimpleFetchedInputAllocator inputManager = new SimpleFetchedInputAllocator( + "srcName", UUID.randomUUID().toString(), 123, conf, + Runtime.getRuntime().maxMemory(), inMemThreshold); long requestSize = (long) (0.4f * inMemThreshold); long compressedSize = 1l; http://git-wip-us.apache.org/repos/asf/tez/blob/0e1d2774/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java index 4a0d1d5..031b44d 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java @@ -258,6 +258,7 @@ public class TestUnorderedPartitionedKVWriter { ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); + int dagId = 1; OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf); Random random = new Random(); @@ -391,7 +392,7 @@ public class TestUnorderedPartitionedKVWriter { // Verify the data // Verify the actual data - TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId); + TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId); Path outputFilePath = kvWriter.finalOutPath; Path spillFilePath = kvWriter.finalIndexPath; if (numRecordsWritten > 0) { @@ -526,6 +527,7 @@ public class TestUnorderedPartitionedKVWriter { ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); + int dagId = 1; OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf); Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, @@ -690,7 +692,7 @@ public class TestUnorderedPartitionedKVWriter { verify(outputContext, atLeast(1)).notifyProgress(); // Verify if all spill files are available. - TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId); + TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId); if (numRecordsWritten > 0) { int numSpills = kvWriter.numSpills.get(); @@ -710,6 +712,7 @@ public class TestUnorderedPartitionedKVWriter { ApplicationId appId = ApplicationId.newInstance(10000000, 1); TezCounters counters = new TezCounters(); String uniqueId = UUID.randomUUID().toString(); + int dagId = 1; OutputContext outputContext = createMockOutputContext(counters, appId, uniqueId, defaultConf); Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class, @@ -847,7 +850,7 @@ public class TestUnorderedPartitionedKVWriter { } // Verify the actual data - TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId); + TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId, dagId); Path outputFilePath = kvWriter.finalOutPath; Path spillFilePath = kvWriter.finalIndexPath;
