Repository: hadoop Updated Branches: refs/heads/trunk e2a027021 -> 7b5cf5352
HDFS-8829. Make SO_RCVBUF and SO_SNDBUF size configurable for DataTransferProtocol sockets and allow configuring auto-tuning (He Tianyi via Colin P. McCabe) Change-Id: I77dc71aaf9e14ef743f2a2cbebeec04a4f628c78 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7b5cf535 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7b5cf535 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7b5cf535 Branch: refs/heads/trunk Commit: 7b5cf5352efedc7d7ebdbb6b58f1b9a688812e75 Parents: e2a0270 Author: Colin Patrick Mccabe <[email protected]> Authored: Mon Sep 14 15:56:04 2015 -0700 Committer: Colin Patrick Mccabe <[email protected]> Committed: Mon Sep 14 16:02:10 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 4 ++ .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 16 ++++- .../hadoop/hdfs/net/DomainPeerServer.java | 5 ++ .../org/apache/hadoop/hdfs/net/PeerServer.java | 9 ++- .../apache/hadoop/hdfs/net/TcpPeerServer.java | 5 ++ .../hadoop/hdfs/server/datanode/DNConf.java | 22 +++++- .../hadoop/hdfs/server/datanode/DataNode.java | 13 +++- .../hdfs/server/datanode/DataXceiver.java | 7 +- .../hdfs/server/datanode/DataXceiverServer.java | 7 +- .../src/main/resources/hdfs-default.xml | 22 ++++++ .../TestDataNodeTransferSocketSize.java | 71 ++++++++++++++++++++ 11 files changed, 169 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5cf535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1b21c4d..270f30b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -928,6 +928,10 @@ Release 2.8.0 - UNRELEASED HDFS-8929. Add a metric to expose the timestamp of the last journal (surendra singh lilhore via vinayakumarb) + HDFS-8829. Make SO_RCVBUF and SO_SNDBUF size configurable for + DataTransferProtocol sockets and allow configuring auto-tuning (He Tianyi + via Colin P. McCabe) + BUG FIXES HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs. http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5cf535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 62abc35..0498450 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaLruTracker; import org.apache.hadoop.http.HttpConfig; @@ -769,9 +770,20 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_DATANODE_BLOCK_PINNING_ENABLED_DEFAULT = false; + public static final String + DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY = + "dfs.datanode.transfer.socket.send.buffer.size"; + public static final int + DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT = + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE; + + public static final String + DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY = + "dfs.datanode.transfer.socket.recv.buffer.size"; + public static final int + DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT = + HdfsConstants.DEFAULT_DATA_SOCKET_SIZE; - - // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry @Deprecated public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5cf535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java index 95a1388..5425bd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/DomainPeerServer.java @@ -50,6 +50,11 @@ public class DomainPeerServer implements PeerServer { } @Override + public int getReceiveBufferSize() throws IOException { + return sock.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE); + } + + @Override public Peer accept() throws IOException, SocketTimeoutException { DomainSocket connSock = sock.accept(); Peer peer = null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5cf535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java index c7b6b14..72974e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/PeerServer.java @@ -32,7 +32,14 @@ public interface PeerServer extends Closeable { public void setReceiveBufferSize(int size) throws IOException; /** - * Listens for a connection to be made to this server and accepts + * Get the receive buffer size of the PeerServer. + * + * @return The receive buffer size. + */ + int getReceiveBufferSize() throws IOException; + + /** + * Listens for a connection to be made to this server and accepts * it. The method blocks until a connection is made. * * @exception IOException if an I/O error occurs when waiting for a http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5cf535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java index e31e46a..8858de8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/net/TcpPeerServer.java @@ -74,6 +74,11 @@ public class TcpPeerServer implements PeerServer { } @Override + public int getReceiveBufferSize() throws IOException { + return this.serverSocket.getReceiveBufferSize(); + } + + @Override public Peer accept() throws IOException, SocketTimeoutException { Peer peer = DFSUtilClient.peerFromSocket(serverSocket.accept()); return peer; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5cf535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 9c25f5e..bd4943d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -71,7 +71,9 @@ public class DNConf { final int socketTimeout; final int socketWriteTimeout; final int socketKeepaliveTimeout; - + private final int transferSocketSendBufferSize; + private final int transferSocketRecvBufferSize; + final boolean transferToAllowed; final boolean dropCacheBehindWrites; final boolean syncBehindWrites; @@ -114,8 +116,14 @@ public class DNConf { socketKeepaliveTimeout = conf.getInt( DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY, DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT); - - /* Based on results on different platforms, we might need set the default + this.transferSocketSendBufferSize = conf.getInt( + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_SEND_BUFFER_SIZE_DEFAULT); + this.transferSocketRecvBufferSize = conf.getInt( + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT); + + /* Based on results on different platforms, we might need set the default * to false on some of them. */ transferToAllowed = conf.getBoolean( DFS_DATANODE_TRANSFERTO_ALLOWED_KEY, @@ -279,4 +287,12 @@ public class DNConf { public boolean getAllowNonLocalLazyPersist() { return allowNonLocalLazyPersist; } + + public int getTransferSocketRecvBufferSize() { + return transferSocketRecvBufferSize; + } + + public int getTransferSocketSendBufferSize() { + return transferSocketSendBufferSize; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5cf535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 0b0a0e8..d51d0a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -910,7 +910,10 @@ public class DataNode extends ReconfigurableBase tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout, DataNode.getStreamingAddr(conf)); } - tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + if (dnConf.getTransferSocketRecvBufferSize() > 0) { + tcpPeerServer.setReceiveBufferSize( + dnConf.getTransferSocketRecvBufferSize()); + } streamingAddr = tcpPeerServer.getStreamingAddr(); LOG.info("Opened streaming server at " + streamingAddr); this.threadGroup = new ThreadGroup("dataXceiverServer"); @@ -958,8 +961,12 @@ public class DataNode extends ReconfigurableBase } DomainPeerServer domainPeerServer = new DomainPeerServer(domainSocketPath, port); - domainPeerServer.setReceiveBufferSize( - HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); + int recvBufferSize = conf.getInt( + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_DEFAULT); + if (recvBufferSize > 0) { + domainPeerServer.setReceiveBufferSize(recvBufferSize); + } return domainPeerServer; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5cf535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index efd2217..4f6dc96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -709,8 +709,11 @@ class DataXceiver extends Receiver implements Runnable { (HdfsConstants.WRITE_TIMEOUT_EXTENSION * targets.length); NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue); mirrorSock.setSoTimeout(timeoutValue); - mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - + if (dnConf.getTransferSocketSendBufferSize() > 0) { + mirrorSock.setSendBufferSize( + dnConf.getTransferSocketSendBufferSize()); + } + OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock, writeTimeout); InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5cf535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index caf6eaa..8d312a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -278,7 +278,12 @@ class DataXceiverServer implements Runnable { synchronized int getNumPeersXceiver() { return peersXceiver.size(); } - + + @VisibleForTesting + PeerServer getPeerServer() { + return peerServer; + } + synchronized void releasePeer(Peer peer) { peers.remove(peer); peersXceiver.remove(peer); http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5cf535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 62665fc..e9b62c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2424,4 +2424,26 @@ </description> </property> +<property> + <name>dfs.datanode.transfer.socket.send.buffer.size</name> + <value>131072</value> + <description> + Socket send buffer size for DataXceiver (mirroring packets to downstream + in pipeline). This may affect TCP connection throughput. + If it is set to zero or negative value, no buffer size will be set + explicitly, thus enable tcp auto-tuning on some system. + </description> +</property> + +<property> + <name>dfs.datanode.transfer.socket.recv.buffer.size</name> + <value>131072</value> + <description> + Socket receive buffer size for DataXceiver (receiving packets from client + during block writing). This may affect TCP connection throughput. + If it is set to zero or negative value, no buffer size will be set + explicitly, thus enable tcp auto-tuning on some system. + </description> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b5cf535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTransferSocketSize.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTransferSocketSize.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTransferSocketSize.java new file mode 100644 index 0000000..0e98b86 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeTransferSocketSize.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.Test; + +public class TestDataNodeTransferSocketSize { + + @Test + public void testSpecifiedDataSocketSize() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt( + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY, 4 * 1024); + SimulatedFSDataset.setFactory(conf); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + List<DataNode> datanodes = cluster.getDataNodes(); + DataNode datanode = datanodes.get(0); + assertEquals("Receive buffer size should be 4K", + 4 * 1024, datanode.getXferServer().getPeerServer().getReceiveBufferSize()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testAutoTuningDataSocketSize() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt( + DFSConfigKeys.DFS_DATANODE_TRANSFER_SOCKET_RECV_BUFFER_SIZE_KEY, 0); + SimulatedFSDataset.setFactory(conf); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + try { + List<DataNode> datanodes = cluster.getDataNodes(); + DataNode datanode = datanodes.get(0); + assertTrue( + "Receive buffer size should be a default value (determined by kernel)", + datanode.getXferServer().getPeerServer().getReceiveBufferSize() > 0); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } +}
