Repository: tez Updated Branches: refs/heads/TEZ-2003 5d6e80367 -> faad3a7db
TEZ-2388. Send dag identifier as part of the fetcher request string. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/faad3a7d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/faad3a7d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/faad3a7d Branch: refs/heads/TEZ-2003 Commit: faad3a7dbc8b273d7a5f1b20514d718055a65b17 Parents: 5d6e803 Author: Siddharth Seth <[email protected]> Authored: Wed Apr 29 08:20:05 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Wed Apr 29 08:20:05 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../tez/runtime/library/common/shuffle/Fetcher.java | 14 ++++++++------ .../runtime/library/common/shuffle/ShuffleUtils.java | 8 +++++--- .../library/common/shuffle/impl/ShuffleManager.java | 2 +- .../ShuffleInputEventHandlerOrderedGrouped.java | 2 +- .../runtime/library/common/shuffle/TestFetcher.java | 6 +++--- 6 files changed, 19 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/faad3a7d/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index d42aaf8..9fc9ed3 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -19,5 +19,6 @@ ALL CHANGES: TEZ-2347. Expose additional information in TaskCommunicatorContext. TEZ-2361. Propagate dag completion to TaskCommunicator. TEZ-2381. Fixes after rebase 04/28. + TEZ-2388. Send dag identifier as part of the fetcher request string. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/faad3a7d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 3154943..01f5bd6 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -87,6 +87,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { private final FetcherCallback fetcherCallback; private final FetchedInputAllocator inputManager; private final ApplicationId appId; + private final int dagIdentifier; private final String logIdentifier; @@ -125,7 +126,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { private final boolean isDebugEnabled = LOG.isDebugEnabled(); private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params, - FetchedInputAllocator inputManager, ApplicationId appId, + FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, @@ -137,6 +138,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { this.inputManager = inputManager; this.jobTokenSecretMgr = jobTokenSecretManager; this.appId = appId; + this.dagIdentifier = dagIdentifier; this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>(); this.httpConnectionParams = params; this.conf = conf; @@ -400,7 +402,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> { private HostFetchResult setupConnection(List<InputAttemptIdentifier> attempts) { try { StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host, - port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled()); + port, partition, appId.toString(), dagIdentifier, httpConnectionParams.isSSLShuffleEnabled()); this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts, httpConnectionParams.getKeepAlive()); @@ -900,21 +902,21 @@ public class Fetcher extends CallableWithNdc<FetchResult> { public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params, FetchedInputAllocator inputManager, - ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, + ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, Configuration conf, boolean localDiskFetchEnabled, String localHostname) { - this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, + this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled, false, localHostname); } public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params, FetchedInputAllocator inputManager, - ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, + ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled, String localHostname) { - this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, + this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname); } http://git-wip-us.apache.org/repos/asf/tez/blob/faad3a7d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 9a8b6b5..b4fd946 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -173,19 +173,21 @@ public class ShuffleUtils { // TODO NEWTEZ handle ssl shuffle public static StringBuilder constructBaseURIForShuffleHandler(String host, - int port, int partition, String appId, boolean sslShuffle) { + int port, int partition, String appId, int dagIdentifier, boolean sslShuffle) { return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port), - partition, appId, sslShuffle); + partition, appId, dagIdentifier, sslShuffle); } public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier, - int partition, String appId, boolean sslShuffle) { + int partition, String appId, int dagIdentifier, boolean sslShuffle) { final String http_protocol = (sslShuffle) ? "https://" : "http://"; StringBuilder sb = new StringBuilder(http_protocol); sb.append(hostIdentifier); sb.append("/"); sb.append("mapOutput?job="); sb.append(appId.replace("application", "job")); + sb.append("&dag="); + sb.append(String.valueOf(dagIdentifier)); sb.append("&reduce="); sb.append(String.valueOf(partition)); sb.append("&map="); http://git-wip-us.apache.org/repos/asf/tez/blob/faad3a7d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 749143a..5075578 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -387,7 +387,7 @@ public class ShuffleManager implements FetcherCallback { } FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this, - httpConnectionParams, inputManager, inputContext.getApplicationId(), + httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(), jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, lockDisk, localDiskFetchEnabled, sharedFetchEnabled, inputContext.getExecutionContext().getHostName()); http://git-wip-us.apache.org/repos/asf/tez/blob/faad3a7d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index 32ac766..9481e65 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -127,7 +127,7 @@ public class ShuffleInputEventHandlerOrderedGrouped { @VisibleForTesting URI getBaseURI(String host, int port, int partitionId) { StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port, - partitionId, inputContext.getApplicationId().toString(), sslShuffle); + partitionId, inputContext.getApplicationId().toString(), inputContext.getDagIdentifier(), sslShuffle); URI u = URI.create(sb.toString()); return u; } http://git-wip-us.apache.org/repos/asf/tez/blob/faad3a7d/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java index e6f0c4a..081efb2 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java @@ -64,7 +64,7 @@ public class TestFetcher { FetcherCallback fetcherCallback = mock(FetcherCallback.class); Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST); + ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST); builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build()); @@ -82,7 +82,7 @@ public class TestFetcher { // When disabled use http fetch conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "false"); builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, false, HOST); + ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, false, HOST); builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); @@ -115,7 +115,7 @@ public class TestFetcher { int partition = 42; FetcherCallback callback = mock(FetcherCallback.class); Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST); + ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST); builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build());
