TEZ-3642. Remove ShuffleClientMetrics (zhiyuany)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f7f60385 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f7f60385 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f7f60385 Branch: refs/heads/TEZ-1190 Commit: f7f60385cf5d8d66a1ee02e64d1e671bf1ad8771 Parents: 2bdf58a Author: Zhiyuan Yang <[email protected]> Authored: Mon Mar 27 18:53:45 2017 -0700 Committer: Zhiyuan Yang <[email protected]> Committed: Mon Mar 27 18:53:45 2017 -0700 ---------------------------------------------------------------------- .../orderedgrouped/FetcherOrderedGrouped.java | 10 --- .../orderedgrouped/ShuffleClientMetrics.java | 92 -------------------- .../orderedgrouped/ShuffleScheduler.java | 6 +- .../shuffle/orderedgrouped/TestFetcher.java | 27 ++---- 4 files changed, 10 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f7f60385/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java index 58ca1e2..5cad6fc 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java @@ -69,7 +69,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { private final TezCounter wrongReduceErrs; private final FetchedInputAllocatorOrderedGrouped allocator; private final ShuffleScheduler scheduler; - private final ShuffleClientMetrics metrics; private final ExceptionReporter exceptionReporter; private final int id; private final String logIdentifier; @@ -107,7 +106,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { public FetcherOrderedGrouped(HttpConnectionParams httpConnectionParams, ShuffleScheduler scheduler, FetchedInputAllocatorOrderedGrouped allocator, - ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, JobTokenSecretManager jobTokenSecretMgr, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, @@ -130,7 +128,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { boolean verifyDiskChecksum) { this.scheduler = scheduler; this.allocator = allocator; - this.metrics = metrics; this.exceptionReporter = exceptionReporter; this.mapHost = mapHost; this.currentPartition = this.mapHost.getPartitionId(); @@ -169,8 +166,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { @VisibleForTesting protected void fetchNext() throws InterruptedException, IOException { try { - metrics.threadBusy(); - if (localDiskFetchEnabled && mapHost.getHost().equals(localShuffleHost) && mapHost.getPort() == localShufflePort) { setupLocalDiskFetch(mapHost); } else { @@ -180,7 +175,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { } finally { cleanupCurrentConnection(false); scheduler.freeHost(mapHost); - metrics.threadFree(); } } @@ -524,7 +518,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { endTime - startTime, mapOutput, false); // Note successful shuffle remaining.remove(srcAttemptId.toString()); - metrics.successFetch(); return null; } catch (IOException ioe) { if (stopped) { @@ -562,7 +555,6 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { // Inform the shuffle-scheduler mapOutput.abort(); - metrics.failedFetch(); return new InputAttemptIdentifier[] {srcAttemptId}; } } @@ -685,13 +677,11 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> { scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(), indexRecord.getRawLength(), (endTime - startTime), mapOutput, true); iter.remove(); - metrics.successFetch(); } catch (IOException e) { if (mapOutput != null) { mapOutput.abort(); } if (!stopped) { - metrics.failedFetch(); ioErrs.increment(1); scheduler.copyFailed(srcAttemptId, host, true, false, true); LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " + http://git-wip-us.apache.org/repos/asf/tez/blob/f7f60385/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java deleted file mode 100644 index f297dad..0000000 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleClientMetrics.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics.MetricsContext; -import org.apache.hadoop.metrics.MetricsRecord; -import org.apache.hadoop.metrics.MetricsUtil; -import org.apache.hadoop.metrics.Updater; -import org.apache.tez.common.TezRuntimeFrameworkConfigs; -import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; -import org.apache.tez.runtime.library.common.Constants; -import org.apache.tez.runtime.library.common.TezRuntimeUtils; - -class ShuffleClientMetrics implements Updater { - - private MetricsRecord shuffleMetrics = null; - private int numFailedFetches = 0; - private int numSuccessFetches = 0; - private long numBytes = 0; - private int numThreadsBusy = 0; - private final int numCopiers; - - ShuffleClientMetrics(String dagName, String vertexName, int taskIndex, Configuration conf, - String user) { - this.numCopiers = - conf.getInt( - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES, - TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES_DEFAULT); - - MetricsContext metricsContext = MetricsUtil.getContext(Constants.TEZ); - this.shuffleMetrics = - MetricsUtil.createRecord(metricsContext, "shuffleInput"); - this.shuffleMetrics.setTag("user", user); - this.shuffleMetrics.setTag("dagName", dagName); - this.shuffleMetrics.setTag("taskId", TezRuntimeUtils.getTaskIdentifier(vertexName, taskIndex)); - this.shuffleMetrics.setTag("sessionId", - conf.get( - TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID, - TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT)); - metricsContext.registerUpdater(this); - } - public synchronized void inputBytes(long numBytes) { - this.numBytes += numBytes; - } - public synchronized void failedFetch() { - ++numFailedFetches; - } - public synchronized void successFetch() { - ++numSuccessFetches; - } - public synchronized void threadBusy() { - ++numThreadsBusy; - } - public synchronized void threadFree() { - --numThreadsBusy; - } - public void doUpdates(MetricsContext unused) { - synchronized (this) { - shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes); - shuffleMetrics.incrMetric("shuffle_failed_fetches", - numFailedFetches); - shuffleMetrics.incrMetric("shuffle_success_fetches", - numSuccessFetches); - if (numCopiers != 0) { - shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", - 100*((float)numThreadsBusy/numCopiers)); - } else { - shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0); - } - numBytes = 0; - numSuccessFetches = 0; - numFailedFetches = 0; - } - shuffleMetrics.update(); - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/f7f60385/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index cce486c..953c73e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -204,7 +204,6 @@ class ShuffleScheduler { private final HttpConnectionParams httpConnectionParams; private final FetchedInputAllocatorOrderedGrouped allocator; - private final ShuffleClientMetrics shuffleMetrics; private final ExceptionReporter exceptionReporter; private final MergeManager mergeManager; private final JobTokenSecretManager jobTokenSecretManager; @@ -370,9 +369,6 @@ class ShuffleScheduler { TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT); this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false); this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); - this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(), - inputContext.getTaskVertexName(), inputContext.getTaskIndex(), - this.conf, UserGroupInformation.getCurrentUser().getShortUserName()); SecretKey jobTokenSecret = ShuffleUtils .getJobTokenSecretFromTokenBytes(inputContext .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID)); @@ -1391,7 +1387,7 @@ class ShuffleScheduler { @VisibleForTesting FetcherOrderedGrouped constructFetcherForHost(MapHost mapHost) { return new FetcherOrderedGrouped(httpConnectionParams, ShuffleScheduler.this, allocator, - shuffleMetrics, exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength, + exceptionReporter, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength, codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, applicationId, dagId, asyncHttp, sslShuffle, http://git-wip-us.apache.org/repos/asf/tez/blob/f7f60385/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java index 310f1b2..a9b57a9 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java @@ -110,7 +110,6 @@ public class TestFetcher { ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); - ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); InputContext inputContext = mock(InputContext.class); @@ -124,7 +123,7 @@ public class TestFetcher { doReturn(mapsForHost).when(scheduler).getMapsForHost(mapHost); FetcherOrderedGrouped fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@ -142,7 +141,6 @@ public class TestFetcher { Configuration conf = new TezConfiguration(); ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); - ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); InputContext inputContext = mock(InputContext.class); @@ -153,7 +151,7 @@ public class TestFetcher { final boolean DISABLE_LOCAL_FETCH = false; MapHost mapHost = new MapHost(HOST, PORT, 0); FetcherOrderedGrouped fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@ -171,7 +169,7 @@ public class TestFetcher { // if hostname does not match use http mapHost = new MapHost(HOST + "_OTHER", PORT, 0); fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@ -187,7 +185,7 @@ public class TestFetcher { // if port does not match use http mapHost = new MapHost(HOST, PORT + 1, 0); fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@ -202,7 +200,7 @@ public class TestFetcher { //if local fetch is not enabled mapHost = new MapHost(HOST, PORT, 0); - fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, @@ -221,14 +219,13 @@ public class TestFetcher { Configuration conf = new TezConfiguration(); ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); - ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); InputContext inputContext = mock(InputContext.class); when(inputContext.getCounters()).thenReturn(new TezCounters()); when(inputContext.getSourceVertexName()).thenReturn(""); MapHost host = new MapHost(HOST, PORT, 1); - FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true); @@ -300,9 +297,6 @@ public class TestFetcher { verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true, false, true); verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true, false, true); - verify(metrics, times(3)).successFetch(); - verify(metrics, times(2)).failedFetch(); - verify(spyFetcher).putBackRemainingMapOutputs(host); verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX)); verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX)); @@ -364,7 +358,6 @@ public class TestFetcher { ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); - ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); InputContext inputContext = mock(InputContext.class); when(inputContext.getCounters()).thenReturn(new TezCounters()); @@ -373,7 +366,7 @@ public class TestFetcher { HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); final MapHost host = new MapHost(HOST, PORT, 1); - FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID, false, false, true); @@ -449,7 +442,6 @@ public class TestFetcher { ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); - ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); TezCounters counters = new TezCounters(); @@ -463,7 +455,7 @@ public class TestFetcher { HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf); final MapHost host = new MapHost(HOST, PORT, 1); FetcherOrderedGrouped mockFetcher = - new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, jobMgr, + new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, shuffle, jobMgr, false, 0, null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, @@ -528,11 +520,10 @@ public class TestFetcher { Configuration conf = new TezConfiguration(); ShuffleScheduler scheduler = mock(ShuffleScheduler.class); MergeManager merger = mock(MergeManager.class); - ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class); Shuffle shuffle = mock(Shuffle.class); MapHost mapHost = new MapHost(HOST, PORT, 0); FetcherOrderedGrouped fetcher = - new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, + new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0, null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
