Repository: spark Updated Branches: refs/heads/branch-2.0 ff2e270eb -> 73014a2aa
[SPARK-17231][CORE] Avoid building debug or trace log messages unless the respective log level is enabled This is simply a backport of #14798 to `branch-2.0`. This backport omits the change to `ExternalShuffleBlockHandler.java`. In `branch-2.0`, that file does not contain the log message that was patched in `master`. Author: Michael Allman <[email protected]> Closes #14811 from mallman/spark-17231-logging_perf_improvements-2.0_backport. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73014a2a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73014a2a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73014a2a Branch: refs/heads/branch-2.0 Commit: 73014a2aa96b538d963f360fd41bac74f358ef46 Parents: ff2e270 Author: Michael Allman <[email protected]> Authored: Thu Aug 25 16:29:04 2016 -0700 Committer: Shixiong Zhu <[email protected]> Committed: Thu Aug 25 16:29:04 2016 -0700 ---------------------------------------------------------------------- .../spark/network/client/TransportClient.java | 39 ++++++++++++-------- .../network/client/TransportClientFactory.java | 2 +- .../client/TransportResponseHandler.java | 15 ++++---- .../spark/network/protocol/MessageDecoder.java | 2 +- .../network/server/TransportChannelHandler.java | 6 +-- .../network/server/TransportRequestHandler.java | 18 ++++----- .../spark/network/server/TransportServer.java | 2 +- .../shuffle/ExternalShuffleBlockResolver.java | 2 +- 8 files changed, 47 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 64a8317..a67683b 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -43,7 +43,7 @@ import org.apache.spark.network.protocol.OneWayMessage; import org.apache.spark.network.protocol.RpcRequest; import org.apache.spark.network.protocol.StreamChunkId; import org.apache.spark.network.protocol.StreamRequest; -import org.apache.spark.network.util.NettyUtils; +import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; /** * Client for fetching consecutive chunks of a pre-negotiated stream. This API is intended to allow @@ -135,9 +135,10 @@ public class TransportClient implements Closeable { long streamId, final int chunkIndex, final ChunkReceivedCallback callback) { - final String serverAddr = NettyUtils.getRemoteAddress(channel); final long startTime = System.currentTimeMillis(); - logger.debug("Sending fetch chunk request {} to {}", chunkIndex, serverAddr); + if (logger.isDebugEnabled()) { + logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel)); + } final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex); handler.addFetchRequest(streamChunkId, callback); @@ -148,11 +149,13 @@ public class TransportClient implements Closeable { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; - logger.trace("Sending request {} to {} took {} ms", streamChunkId, serverAddr, - timeTaken); + if (logger.isTraceEnabled()) { + logger.trace("Sending request {} to {} took {} ms", streamChunkId, getRemoteAddress(channel), + timeTaken); + } } else { String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId, - serverAddr, future.cause()); + getRemoteAddress(channel), future.cause()); logger.error(errorMsg, future.cause()); handler.removeFetchRequest(streamChunkId); channel.close(); @@ -173,9 +176,10 @@ public class TransportClient implements Closeable { * @param callback Object to call with the stream data. */ public void stream(final String streamId, final StreamCallback callback) { - final String serverAddr = NettyUtils.getRemoteAddress(channel); final long startTime = System.currentTimeMillis(); - logger.debug("Sending stream request for {} to {}", streamId, serverAddr); + if (logger.isDebugEnabled()) { + logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel)); + } // Need to synchronize here so that the callback is added to the queue and the RPC is // written to the socket atomically, so that callbacks are called in the right order @@ -188,11 +192,13 @@ public class TransportClient implements Closeable { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; - logger.trace("Sending request for {} to {} took {} ms", streamId, serverAddr, - timeTaken); + if (logger.isTraceEnabled()) { + logger.trace("Sending request for {} to {} took {} ms", streamId, getRemoteAddress(channel), + timeTaken); + } } else { String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId, - serverAddr, future.cause()); + getRemoteAddress(channel), future.cause()); logger.error(errorMsg, future.cause()); channel.close(); try { @@ -215,9 +221,10 @@ public class TransportClient implements Closeable { * @return The RPC's id. */ public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) { - final String serverAddr = NettyUtils.getRemoteAddress(channel); final long startTime = System.currentTimeMillis(); - logger.trace("Sending RPC to {}", serverAddr); + if (logger.isTraceEnabled()) { + logger.trace("Sending RPC to {}", getRemoteAddress(channel)); + } final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); handler.addRpcRequest(requestId, callback); @@ -228,10 +235,12 @@ public class TransportClient implements Closeable { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; - logger.trace("Sending request {} to {} took {} ms", requestId, serverAddr, timeTaken); + if (logger.isTraceEnabled()) { + logger.trace("Sending request {} to {} took {} ms", requestId, getRemoteAddress(channel), timeTaken); + } } else { String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId, - serverAddr, future.cause()); + getRemoteAddress(channel), future.cause()); logger.error(errorMsg, future.cause()); handler.removeRpcRequest(requestId); channel.close(); http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index a27aaf2..1c9916b 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -195,7 +195,7 @@ public class TransportClientFactory implements Closeable { /** Create a completely new {@link TransportClient} to the remote address. */ private TransportClient createClient(InetSocketAddress address) throws IOException { - logger.debug("Creating new connection to " + address); + logger.debug("Creating new connection to {}", address); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index 8a69223..1796672 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -38,7 +38,7 @@ import org.apache.spark.network.protocol.StreamChunkId; import org.apache.spark.network.protocol.StreamFailure; import org.apache.spark.network.protocol.StreamResponse; import org.apache.spark.network.server.MessageHandler; -import org.apache.spark.network.util.NettyUtils; +import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; import org.apache.spark.network.util.TransportFrameDecoder; /** @@ -122,7 +122,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { @Override public void channelInactive() { if (numOutstandingRequests() > 0) { - String remoteAddress = NettyUtils.getRemoteAddress(channel); + String remoteAddress = getRemoteAddress(channel); logger.error("Still have {} requests outstanding when connection from {} is closed", numOutstandingRequests(), remoteAddress); failOutstandingRequests(new IOException("Connection from " + remoteAddress + " closed")); @@ -132,7 +132,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { @Override public void exceptionCaught(Throwable cause) { if (numOutstandingRequests() > 0) { - String remoteAddress = NettyUtils.getRemoteAddress(channel); + String remoteAddress = getRemoteAddress(channel); logger.error("Still have {} requests outstanding when connection from {} is closed", numOutstandingRequests(), remoteAddress); failOutstandingRequests(cause); @@ -141,13 +141,12 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { @Override public void handle(ResponseMessage message) throws Exception { - String remoteAddress = NettyUtils.getRemoteAddress(channel); if (message instanceof ChunkFetchSuccess) { ChunkFetchSuccess resp = (ChunkFetchSuccess) message; ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId); if (listener == null) { logger.warn("Ignoring response for block {} from {} since it is not outstanding", - resp.streamChunkId, remoteAddress); + resp.streamChunkId, getRemoteAddress(channel)); resp.body().release(); } else { outstandingFetches.remove(resp.streamChunkId); @@ -159,7 +158,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId); if (listener == null) { logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding", - resp.streamChunkId, remoteAddress, resp.errorString); + resp.streamChunkId, getRemoteAddress(channel), resp.errorString); } else { outstandingFetches.remove(resp.streamChunkId); listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException( @@ -170,7 +169,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { RpcResponseCallback listener = outstandingRpcs.get(resp.requestId); if (listener == null) { logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding", - resp.requestId, remoteAddress, resp.body().size()); + resp.requestId, getRemoteAddress(channel), resp.body().size()); } else { outstandingRpcs.remove(resp.requestId); try { @@ -184,7 +183,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> { RpcResponseCallback listener = outstandingRpcs.get(resp.requestId); if (listener == null) { logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding", - resp.requestId, remoteAddress, resp.errorString); + resp.requestId, getRemoteAddress(channel), resp.errorString); } else { outstandingRpcs.remove(resp.requestId); listener.onFailure(new RuntimeException(resp.errorString)); http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java index 074780f..f045318 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageDecoder.java @@ -39,7 +39,7 @@ public final class MessageDecoder extends MessageToMessageDecoder<ByteBuf> { Message.Type msgType = Message.Type.decode(in); Message decoded = decode(msgType, in); assert decoded.type() == msgType; - logger.trace("Received message " + msgType + ": " + decoded); + logger.trace("Received message {}: {}", msgType, decoded); out.add(decoded); } http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java index f222337..884ea7d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java @@ -29,7 +29,7 @@ import org.apache.spark.network.client.TransportResponseHandler; import org.apache.spark.network.protocol.Message; import org.apache.spark.network.protocol.RequestMessage; import org.apache.spark.network.protocol.ResponseMessage; -import org.apache.spark.network.util.NettyUtils; +import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; /** * The single Transport-level Channel handler which is used for delegating requests to the @@ -76,7 +76,7 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.warn("Exception in connection from " + NettyUtils.getRemoteAddress(ctx.channel()), + logger.warn("Exception in connection from " + getRemoteAddress(ctx.channel()), cause); requestHandler.exceptionCaught(cause); responseHandler.exceptionCaught(cause); @@ -139,7 +139,7 @@ public class TransportChannelHandler extends SimpleChannelInboundHandler<Message System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs; if (e.state() == IdleState.ALL_IDLE && isActuallyOverdue) { if (responseHandler.numOutstandingRequests() > 0) { - String address = NettyUtils.getRemoteAddress(ctx.channel()); + String address = getRemoteAddress(ctx.channel()); logger.error("Connection to {} has been quiet for {} ms while there are outstanding " + "requests. Assuming connection is dead; please adjust spark.network.timeout if " + "this is wrong.", address, requestTimeoutNs / 1000 / 1000); http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index bebe88e..e67a034 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -17,6 +17,7 @@ package org.apache.spark.network.server; +import java.net.SocketAddress; import java.nio.ByteBuffer; import com.google.common.base.Throwables; @@ -42,7 +43,7 @@ import org.apache.spark.network.protocol.RpcResponse; import org.apache.spark.network.protocol.StreamFailure; import org.apache.spark.network.protocol.StreamRequest; import org.apache.spark.network.protocol.StreamResponse; -import org.apache.spark.network.util.NettyUtils; +import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; /** * A handler that processes requests from clients and writes chunk data back. Each handler is @@ -114,9 +115,9 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> { } private void processFetchRequest(final ChunkFetchRequest req) { - final String client = NettyUtils.getRemoteAddress(channel); - - logger.trace("Received req from {} to fetch block {}", client, req.streamChunkId); + if (logger.isTraceEnabled()) { + logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), req.streamChunkId); + } ManagedBuffer buf; try { @@ -125,7 +126,7 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> { buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex); } catch (Exception e) { logger.error(String.format( - "Error opening block %s for request from %s", req.streamChunkId, client), e); + "Error opening block %s for request from %s", req.streamChunkId, getRemoteAddress(channel)), e); respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e))); return; } @@ -134,13 +135,12 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> { } private void processStreamRequest(final StreamRequest req) { - final String client = NettyUtils.getRemoteAddress(channel); ManagedBuffer buf; try { buf = streamManager.openStream(req.streamId); } catch (Exception e) { logger.error(String.format( - "Error opening stream %s for request from %s", req.streamId, client), e); + "Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e); respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e))); return; } @@ -189,13 +189,13 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> { * it will be logged and the channel closed. */ private void respond(final Encodable result) { - final String remoteAddress = channel.remoteAddress().toString(); + final SocketAddress remoteAddress = channel.remoteAddress(); channel.writeAndFlush(result).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { - logger.trace(String.format("Sent result %s to client %s", result, remoteAddress)); + logger.trace("Sent result {} to client {}", result, remoteAddress); } else { logger.error(String.format("Error sending result %s to %s; closing connection", result, remoteAddress), future.cause()); http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java index baae235..a67db4f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -130,7 +130,7 @@ public class TransportServer implements Closeable { channelFuture.syncUninterruptibly(); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); - logger.debug("Shuffle server started on port :" + port); + logger.debug("Shuffle server started on port: {}", port); } @Override http://git-wip-us.apache.org/repos/asf/spark/blob/73014a2a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 54e870a..000ec13 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -244,7 +244,7 @@ public class ExternalShuffleBlockResolver { for (String localDir : dirs) { try { JavaUtils.deleteRecursively(new File(localDir)); - logger.debug("Successfully cleaned up directory: " + localDir); + logger.debug("Successfully cleaned up directory: {}", localDir); } catch (Exception e) { logger.error("Failed to delete directory: " + localDir, e); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
