Repository: tez Updated Branches: refs/heads/master 3ec953753 -> d0b189af6
Revert "TEZ-3642. Remove ShuffleClientMetrics (zhiyuany)" This reverts commit f7f60385cf5d8d66a1ee02e64d1e671bf1ad8771. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f5d4c36c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f5d4c36c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f5d4c36c Branch: refs/heads/master Commit: f5d4c36cc2fbce8e4075b2f7385faf7a13a862c3 Parents: 3ec9537 Author: Zhiyuan Yang <[email protected]> Authored: Mon Apr 3 11:18:42 2017 -0700 Committer: Zhiyuan Yang <[email protected]> Committed: Mon Apr 3 11:18:42 2017 -0700 ---------------------------------------------------------------------- .../orderedgrouped/FetcherOrderedGrouped.java | 10 +++ .../orderedgrouped/ShuffleClientMetrics.java | 92 ++++++++++++++++++++ .../orderedgrouped/ShuffleScheduler.java | 6 +- .../shuffle/orderedgrouped/TestFetcher.java | 27 ++++-- 4 files changed, 125 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f5d4c36c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 5cad6fc..58ca1e2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -69,6 +69,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { private final TezCounter wrongReduceErrs; private final FetchedInputAllocatorOrderedGrouped allocator; private final ShuffleScheduler scheduler; + private final ShuffleClientMetrics metrics; private final ExceptionReporter exceptionReporter; private final int id; private final String logIdentifier; @@ -106,6 +107,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams, ShuffleScheduler scheduler, FetchedInputAllocatorOrderedGrouped allocator, + ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, JobTokenSecretManager jobTokenSecretMgr, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, @@ -128,6 +130,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { boolean verifyDiskChecksum) { this.scheduler = scheduler; this.allocator = allocator; + this.metrics = metrics; this.exceptionReporter = exceptionReporter; this.mapHost = mapHost; this.currentPartition = this.mapHost.getPartitionId(); @@ -166,6 +169,8 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { @VisibleForTesting protected void fetchNext() throws InterruptedException, IOException { try { + metrics.threadBusy(); + if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) && mapHost.getPort() == localShufflePort) { setupLocalDiskFetch(mapHost); } else { @@ -175,6 +180,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { } finally { cleanupCurrentConnection(false); scheduler.freeHost(mapHost); + metrics.threadFree(); } } @@ -518,6 +524,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { endTime - startTime, mapOutput, false); // Note successful shuffle remaining.remove(srcAttemptId.toString()); + metrics.successFetch(); return null; } catch (IOException ioe) { if (stopped) { @@ -555,6 +562,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { // Inform the shuffle-scheduler mapOutput.abort(); + metrics.failedFetch(); return new InputAttemptIdentifier[] {srcAttemptId}; } } @@ -677,11 +685,13 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(), indexRecord.getRawLength(), (endTime - startTime), mapOutput, true); iter.remove(); + metrics.successFetch(); } catch (IOException e) { if (mapOutput != null) { mapOutput.abort(); } if (!stopped) { + metrics.failedFetch(); ioErrs.increment(1); scheduler.copyFailed(srcAttemptId, host, true, false, true); LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " + http://git-wip-us.apache.org/repos/asf/tez/blob/f5d4c36c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java new file mode 100644 index 0000000..f297dad --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics.MetricsContext; +import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.Updater; +import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.Constants; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; + +class ShuffleClientMetrics implements Updater { + + private MetricsRecord shuffleMetrics = null; + private int numFailedFetches = 0; + private int numSuccessFetches = 0; + private long numBytes = 0; + private int numThreadsBusy = 0; + private final int numCopiers; + + ShuffleClientMetrics(String dagName, String vertexName, int taskIndex, Configuration conf, + String user) { + this.numCopiers = + conf.getInt( + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, + TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT); + + MetricsContext metricsContext = MetricsUtil.getContext(Constants.TEZ); + this.shuffleMetrics = + MetricsUtil.createRecord(metricsContext, "shuffleInput"); + this.shuffleMetrics.setTag("user", user); + this.shuffleMetrics.setTag("dagName", dagName); + this.shuffleMetrics.setTag("taskId", TezRuntimeUtils.getTaskIdentifier(vertexName, taskIndex)); + this.shuffleMetrics.setTag("sessionId", + conf.get( + TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID, + TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT)); + metricsContext.registerUpdater(this); + } + public synchronized void inputBytes(long numBytes) { + this.numBytes += numBytes; + } + public synchronized void failedFetch() { + ++numFailedFetches; + } + public synchronized void successFetch() { + ++numSuccessFetches; + } + public synchronized void threadBusy() { + ++numThreadsBusy; + } + public synchronized void threadFree() { + --numThreadsBusy; + } + public void doUpdates(MetricsContext unused) { + synchronized (this) { + shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes); + shuffleMetrics.incrMetric("shuffle_failed_fetches", + numFailedFetches); + shuffleMetrics.incrMetric("shuffle_success_fetches", + numSuccessFetches); + if (numCopiers != 0) { + shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", + 100*((float)numThreadsBusy/numCopiers)); + } else { + shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0); + } + numBytes = 0; + numSuccessFetches = 0; + numFailedFetches = 0; + } + shuffleMetrics.update(); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/f5d4c36c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 953c73e..cce486c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -204,6 +204,7 @@ class ShuffleScheduler { private final HttpConnectionParams httpConnectionParams; private final FetchedInputAllocatorOrderedGrouped allocator; + private final ShuffleClientMetrics shuffleMetrics; private final ExceptionReporter exceptionReporter; private final MergeManager mergeManager; private final JobTokenSecretManager jobTokenSecretManager; @@ -369,6 +370,9 @@ class ShuffleScheduler { TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT); this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false); this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); + this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(), + inputContext.getTaskVertexName(), inputContext.getTaskIndex(), + this.conf, UserGroupInformation.getCurrentUser().getShortUserName()); SecretKey jobTokenSecret = ShuffleUtils .getJobTokenSecretFromTokenBytes(inputContext .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID)); @@ -1387,7 +1391,7 @@ class ShuffleScheduler { @VisibleForTesting FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) { return new FetcherOrderedGrouped(httpConnectionParams, ShuffleScheduler.this, allocator, - exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength, + shuffleMetrics, exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength, codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle, http://git-wip-us.apache.org/repos/asf/tez/blob/f5d4c36c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index a9b57a9..310f1b2 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -110,6 +110,7 @@ public class TestFetcher { ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); InputContext inputContext = mock(InputContext.class); @@ -123,7 +124,7 @@ public class TestFetcher { doReturn(mapsForHost).when(scheduler).getMapsForHost(mapHost); FetcherOrderedGrouped fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@ -141,6 +142,7 @@ public class TestFetcher { Configuration conf = new TezConfiguration(); ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); InputContext inputContext = mock(InputContext.class); @@ -151,7 +153,7 @@ public class TestFetcher { final boolean DISABLE_LOCAL_FETCH = false; MapHost mapHost = new MapHost(HOST, PORT, 0); FetcherOrderedGrouped fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@ -169,7 +171,7 @@ public class TestFetcher { // if hostname does not match use http mapHost = new MapHost(HOST + "_OTHER", PORT, 0); fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@ -185,7 +187,7 @@ public class TestFetcher { // if port does not match use http mapHost = new MapHost(HOST, PORT + 1, 0); fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@ -200,7 +202,7 @@ public class TestFetcher { //if local fetch is not enabled mapHost = new MapHost(HOST, PORT, 0); - fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, + fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@ -219,13 +221,14 @@ public class TestFetcher { Configuration conf = new TezConfiguration(); ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); InputContext inputContext = mock(InputContext.class); when(inputContext.getCounters()).thenReturn(new TezCounters()); when(inputContext.getSourceVertexName()).thenReturn(""); MapHost host = new MapHost(HOST, PORT, 1); - FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, + FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true); @@ -297,6 +300,9 @@ public class TestFetcher { verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false, true); verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false, true); + verify(metrics, times(3)).successFetch(); + verify(metrics, times(2)).failedFetch(); + verify(spyFetcher).putBackRemainingMapOutputs(host); verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX)); verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX)); @@ -358,6 +364,7 @@ public class TestFetcher { ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); InputContext inputContext = mock(InputContext.class); when(inputContext.getCounters()).thenReturn(new TezCounters()); @@ -366,7 +373,7 @@ public class TestFetcher { HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); final MapHost host = new MapHost(HOST, PORT, 1); - FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, + FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true); @@ -442,6 +449,7 @@ public class TestFetcher { ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); TezCounters counters = new TezCounters(); @@ -455,7 +463,7 @@ public class TestFetcher { HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); final MapHost host = new MapHost(HOST, PORT, 1); FetcherOrderedGrouped mockFetcher = - new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, shuffle, jobMgr, + new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, jobMgr, false, 0, null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, @@ -520,10 +528,11 @@ public class TestFetcher { Configuration conf = new TezConfiguration(); ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); + ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); MapHost mapHost = new MapHost(HOST, PORT, 0); FetcherOrderedGrouped fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
