Repository: tez Updated Branches: refs/heads/master c0270cb30 -> d415197c6
TEZ-3581. Add different logger to enable suppressing logs for specific lines. Contributed by Harish Jaiprakash. Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d415197c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d415197c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d415197c Branch: refs/heads/master Commit: d415197c67562ee2859b240e8a7e316067c4ed6b Parents: c0270cb Author: Siddharth Seth <[email protected]> Authored: Sun Feb 5 19:08:08 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Sun Feb 5 19:08:08 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/history/HistoryEventHandler.java | 23 +++-- .../resources/tez-container-log4j.properties | 8 ++ .../org/apache/tez/http/HttpConnection.java | 18 +++- .../library/common/shuffle/ShuffleUtils.java | 92 ++++++++++++++------ .../common/shuffle/impl/ShuffleManager.java | 7 +- .../orderedgrouped/ShuffleScheduler.java | 8 +- .../common/shuffle/TestShuffleUtils.java | 34 ++++++-- 8 files changed, 144 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 37438d9..c4a1d72 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3581. Add different logger to enable suppressing logs for specific lines. TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels TEZ-3600. Fix flaky test: TestTokenCache TEZ-3589. add a unit test for amKeepAlive not being shutdown if an app takes a long time to launch. http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java index 79d1fc3..4fa1926 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +45,8 @@ import org.apache.tez.dag.records.TezTaskAttemptID; public class HistoryEventHandler extends CompositeService { private static Logger LOG = LoggerFactory.getLogger(HistoryEventHandler.class); + private static Logger LOG_CRITICAL_EVENTS = + LoggerFactory.getLogger(LOG.getName() + ".criticalEvents"); private final AppContext context; private RecoveryService recoveryService; @@ -59,6 +62,8 @@ public class HistoryEventHandler extends CompositeService { private final ConcurrentHashMap<TezTaskAttemptID, DAGHistoryEvent> suppressedEvents = new ConcurrentHashMap<>(); + private final AtomicLong criticalEventCount = new AtomicLong(); + public HistoryEventHandler(AppContext context) { super(HistoryEventHandler.class.getName()); this.context = context; @@ -141,12 +146,18 @@ public class HistoryEventHandler extends CompositeService { historyLoggingService.handle(event); } - // TODO at some point we should look at removing this once - // there is a UI in place - LOG.info("[HISTORY]" - + "[DAG:" + dagIdStr + "]" - + "[Event:" + event.getHistoryEvent().getEventType().name() + "]" - + ": " + event.getHistoryEvent().toString()); + if (LOG_CRITICAL_EVENTS.isInfoEnabled()) { + // TODO at some point we should look at removing this once + // there is a UI in place + LOG_CRITICAL_EVENTS.info("[HISTORY]" + + "[DAG:" + dagIdStr + "]" + + "[Event:" + event.getHistoryEvent().getEventType().name() + "]" + + ": " + event.getHistoryEvent().toString()); + } else { + if (criticalEventCount.incrementAndGet() % 1000 == 0) { + LOG.info("Got {} critical events", criticalEventCount); + } + } } private boolean shouldLogEvent(DAGHistoryEvent event) { http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-dag/src/main/resources/tez-container-log4j.properties ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/resources/tez-container-log4j.properties b/tez-dag/src/main/resources/tez-container-log4j.properties index 4620a78..721cd67 100644 --- a/tez-dag/src/main/resources/tez-container-log4j.properties +++ b/tez-dag/src/main/resources/tez-container-log4j.properties @@ -36,3 +36,11 @@ log4j.appender.CLA.layout.ConversionPattern=%d{ISO8601} [%p] [%t] |%c{2}|: %m%n # log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter +# Disable loggers which log a lot, use this if you want to reduce the log sizes. This will affect +# the analyzer since it relies on these log lines. +# log4j.logger.org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager.fetch=WARN +# log4j.logger.org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler=WARN +# log4j.logger.org.apache.tez.http.HttpConnection.url=WARN + +# This should be part of the AM log4j.properties file, it will not work from this file. +# log4j.logger.org.apache.tez.dag.history.HistoryEventHandler.criticalEvents=WARN http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java index d781e64..9bfe4e7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java @@ -36,10 +36,12 @@ import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; public class HttpConnection extends BaseHttpConnection { private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class); + private static final Logger URL_LOG = LoggerFactory.getLogger(LOG.getName() + ".url"); private URL url; private final String logIdentifier; @@ -56,6 +58,7 @@ public class HttpConnection extends BaseHttpConnection { private final HttpConnectionParams httpConnParams; private final StopWatch stopWatch; + private final AtomicLong urlLogCount; /** * HttpConnection @@ -73,6 +76,7 @@ public class HttpConnection extends BaseHttpConnection { this.httpConnParams = connParams; this.url = url; this.stopWatch = new StopWatch(); + this.urlLogCount = new AtomicLong(); if (LOG.isDebugEnabled()) { LOG.debug("MapOutput URL :" + url.toString()); } @@ -229,9 +233,17 @@ public class HttpConnection extends BaseHttpConnection { // verify that replyHash is HMac of encHash SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr); - //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host - LOG.info("for url=" + url + - " sent hash and receievd reply " + stopWatch.now(TimeUnit.MILLISECONDS) + " ms"); + if (URL_LOG.isInfoEnabled()) { + // Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM + // host + URL_LOG.info("for url=" + url + " sent hash and receievd reply " + + stopWatch.now(TimeUnit.MILLISECONDS) + " ms"); + } else { + // Log summary. + if (urlLogCount.incrementAndGet() % 1000 == 0) { + LOG.info("Sent hash and recieved reply for {} urls", urlLogCount); + } + } } /** http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index aa07233..82e844d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -29,6 +29,7 @@ import java.text.DecimalFormat; import java.util.BitSet; import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Deflater; import javax.annotation.Nullable; @@ -516,35 +517,70 @@ public class ShuffleUtils { return builder.build(); } - /** - * Log individual fetch complete event. - * This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining - * - amount of data transferred between source to destination machine - * - time taken to transfer data between source to destination machine - * - details on DISK/DISK_DIRECT/MEMORY based shuffles - * - * @param log - * @param millis - * @param bytesCompressed - * @param bytesDecompressed - * @param outputType - * @param srcAttemptIdentifier - */ - public static void logIndividualFetchComplete(Logger log, long millis, long - bytesCompressed, - long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) { - double rate = 0; - if (millis != 0) { - rate = bytesCompressed / ((double) millis / 1000); - rate = rate / (1024 * 1024); + public static class FetchStatsLogger { + private final Logger activeLogger; + private final Logger aggregateLogger; + private final AtomicLong logCount = new AtomicLong(); + private final AtomicLong compressedSize = new AtomicLong(); + private final AtomicLong decompressedSize = new AtomicLong(); + private final AtomicLong totalTime = new AtomicLong(); + + public FetchStatsLogger(Logger activeLogger, Logger aggregateLogger) { + this.activeLogger = activeLogger; + this.aggregateLogger = aggregateLogger; + } + + /** + * Log individual fetch complete event. + * This log information would be used by tez-tool/perf-analzyer/shuffle tools for mining + * - amount of data transferred between source to destination machine + * - time taken to transfer data between source to destination machine + * - details on DISK/DISK_DIRECT/MEMORY based shuffles + * + * @param millis + * @param bytesCompressed + * @param bytesDecompressed + * @param outputType + * @param srcAttemptIdentifier + */ + public void logIndividualFetchComplete(long millis, long bytesCompressed, + long bytesDecompressed, String outputType, InputAttemptIdentifier srcAttemptIdentifier) { + double rate = 0; + if (millis != 0) { + rate = bytesCompressed / ((double) millis / 1000); + rate = rate / (1024 * 1024); + } + if (activeLogger.isInfoEnabled()) { + activeLogger.info( + "Completed fetch for attempt: " + + toShortString(srcAttemptIdentifier) + +" to " + outputType + + ", csize=" + bytesCompressed + ", dsize=" + bytesDecompressed + + ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" + + MBPS_FORMAT.get().format(rate) + " MB/s"); + } else { + long currentCount, currentCompressedSize, currentDecompressedSize, currentTotalTime; + synchronized (this) { + currentCount = logCount.incrementAndGet(); + currentCompressedSize = compressedSize.addAndGet(bytesCompressed); + currentDecompressedSize = decompressedSize.addAndGet(bytesDecompressed); + currentTotalTime = totalTime.addAndGet(millis); + if (currentCount % 1000 == 0) { + compressedSize.set(0); + decompressedSize.set(0); + totalTime.set(0); + } + } + if (currentCount % 1000 == 0) { + double avgRate = currentTotalTime == 0 ? 0 + : currentCompressedSize / (double)currentTotalTime / 1000 / 1024 / 1024; + aggregateLogger.info("Completed {} fetches, stats for last 1000 fetches: " + + "avg csize: {}, avg dsize: {}, avgTime: {}, avgRate: {}", currentCount, + currentCompressedSize / 1000, currentDecompressedSize / 1000, currentTotalTime / 1000, + MBPS_FORMAT.get().format(avgRate)); + } + } } - log.info( - "Completed fetch for attempt: " - + toShortString(srcAttemptIdentifier) - +" to " + outputType + - ", csize=" + bytesCompressed + ", dsize=" + bytesDecompressed + - ", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" + - MBPS_FORMAT.get().format(rate) + " MB/s"); } private static String toShortString(InputAttemptIdentifier inputAttemptIdentifier) { http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index d034b2e..b2ff51d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -80,6 +80,7 @@ import org.apache.tez.runtime.library.common.shuffle.HostPort; import org.apache.tez.runtime.library.common.shuffle.InputHost; import org.apache.tez.runtime.library.common.shuffle.InputHost.PartitionToInputs; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.FetchStatsLogger; import com.google.common.base.Objects; import com.google.common.base.Preconditions; @@ -98,6 +99,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; public class ShuffleManager implements FetcherCallback { private static final Logger LOG = LoggerFactory.getLogger(ShuffleManager.class); + private static final Logger LOG_FETCH = LoggerFactory.getLogger(LOG.getName() + ".fetch"); + private static final FetchStatsLogger fetchStatsLogger = new FetchStatsLogger(LOG_FETCH, LOG); private final InputContext inputContext; private final int numInputs; @@ -628,8 +631,8 @@ public class ShuffleManager implements FetcherCallback { if (!completedInputSet.contains(inputIdentifier)) { fetchedInput.commit(); committed = true; - ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration, - fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier); + fetchStatsLogger.logIndividualFetchComplete(copyDuration, fetchedBytes, + decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier); // Processing counters for completed and commit fetches only. Need // additional counters for excessive fetches - which primarily comes http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java index 3d2c1ad..cce486c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java @@ -73,6 +73,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.FetchStatsLogger; import org.apache.tez.runtime.library.common.shuffle.HostPort; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPortPartition; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type; @@ -140,6 +141,9 @@ class ShuffleScheduler { private final AtomicLong shuffleStart = new AtomicLong(0); private static final Logger LOG = LoggerFactory.getLogger(ShuffleScheduler.class); + private static final Logger LOG_FETCH = LoggerFactory.getLogger(LOG.getName() + ".fetch"); + private static final FetchStatsLogger fetchStatsLogger = new FetchStatsLogger(LOG_FETCH, LOG); + static final long INITIAL_PENALTY = 2000L; // 2 seconds private static final float PENALTY_GROWTH_RATE = 1.3f; @@ -576,8 +580,8 @@ class ShuffleScheduler { } output.commit(); - ShuffleUtils.logIndividualFetchComplete(LOG, millis, bytesCompressed, - bytesDecompressed, output.getType().toString(), srcAttemptIdentifier); + fetchStatsLogger.logIndividualFetchComplete(millis, bytesCompressed, bytesDecompressed, + output.getType().toString(), srcAttemptIdentifier); if (output.getType() == Type.DISK) { bytesShuffledToDisk.increment(bytesCompressed); } else if (output.getType() == Type.DISK_DIRECT) { http://git-wip-us.apache.org/repos/asf/tez/blob/d415197c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java index 496468b..f21da7c 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java @@ -4,7 +4,6 @@ import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; @@ -24,7 +23,8 @@ import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; -import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream; +import org.apache.tez.runtime.library.common.InputAttemptIdentifier; +import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils.FetchStatsLogger; import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord; import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord; import org.apache.tez.runtime.library.partitioner.HashPartitioner; @@ -32,6 +32,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Matchers; import org.slf4j.Logger; import java.io.ByteArrayInputStream; @@ -47,8 +48,11 @@ import java.util.Random; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -70,10 +74,6 @@ import static org.mockito.Mockito.when; */ public class TestShuffleUtils { - private static final String HOST = "localhost"; - private static final int PORT = 8080; - private static final String PATH_COMPONENT = "attempt"; - private OutputContext outputContext; private Configuration conf; private FileSystem localFs; @@ -313,4 +313,26 @@ public class TestShuffleUtils { } catch (IOException e) { } } + + @Test + public void testFetchStatsLogger() throws Exception { + Logger activeLogger = mock(Logger.class); + Logger aggregateLogger = mock(Logger.class); + FetchStatsLogger logger = new FetchStatsLogger(activeLogger, aggregateLogger); + + InputAttemptIdentifier ident = new InputAttemptIdentifier(1, 1); + when(activeLogger.isInfoEnabled()).thenReturn(false); + for (int i = 0; i < 1000; i++) { + logger.logIndividualFetchComplete(10, 100, 1000, "testType", ident); + } + verify(activeLogger, times(0)).info(anyString()); + verify(aggregateLogger, times(1)).info(anyString(), Matchers.<Object[]>anyVararg()); + + when(activeLogger.isInfoEnabled()).thenReturn(true); + for (int i = 0; i < 1000; i++) { + logger.logIndividualFetchComplete(10, 100, 1000, "testType", ident); + } + verify(activeLogger, times(1000)).info(anyString()); + verify(aggregateLogger, times(1)).info(anyString(), Matchers.<Object[]>anyVararg()); + } }
