Author: todd
Date: Fri May 4 18:50:54 2012
New Revision: 1334116
URL: http://svn.apache.org/viewvc?rev=1334116&view=rev
Log:
HDFS-3357. DataXceiver reads from client socket with incorrect/no timeout.
Contributed by Todd Lipcon.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1334116&r1=1334115&r2=1334116&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri May 4
18:50:54 2012
@@ -597,6 +597,9 @@ Release 2.0.0 - UNRELEASED
HDFS-3350. In INode, add final to compareTo(..), equals(..) and hashCode(),
and remove synchronized from updatePermissionStatus(..). (szetszwo)
+ HDFS-3357. DataXceiver reads from client socket with incorrect/no timeout
+ (todd)
+
BREAKDOWN OF HDFS-1623 SUBTASKS
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1334116&r1=1334115&r2=1334116&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Fri May 4 18:50:54 2012
@@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@@ -83,13 +84,24 @@ class DataXceiver extends Receiver imple
private final DataXceiverServer dataXceiverServer;
private long opStartTime; //the start time of receiving an Op
+ private final SocketInputWrapper socketInputWrapper;
- public DataXceiver(Socket s, DataNode datanode,
+ public static DataXceiver create(Socket s, DataNode dn,
+ DataXceiverServer dataXceiverServer) throws IOException {
+
+ SocketInputWrapper iw = NetUtils.getInputStream(s);
+ return new DataXceiver(s, iw, dn, dataXceiverServer);
+ }
+
+ private DataXceiver(Socket s,
+ SocketInputWrapper socketInput,
+ DataNode datanode,
DataXceiverServer dataXceiverServer) throws IOException {
super(new DataInputStream(new BufferedInputStream(
- NetUtils.getInputStream(s), HdfsConstants.SMALL_BUFFER_SIZE)));
+ socketInput, HdfsConstants.SMALL_BUFFER_SIZE)));
this.s = s;
+ this.socketInputWrapper = socketInput;
this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
this.datanode = datanode;
this.dnConf = datanode.getDnConf();
@@ -128,8 +140,6 @@ class DataXceiver extends Receiver imple
Op op = null;
dataXceiverServer.childSockets.add(s);
try {
- int stdTimeout = s.getSoTimeout();
-
// We process requests in a loop, and stay around for a short timeout.
// This optimistic behaviour allows the other end to reuse connections.
// Setting keepalive timeout to 0 disable this behavior.
@@ -139,7 +149,9 @@ class DataXceiver extends Receiver imple
try {
if (opsProcessed != 0) {
assert dnConf.socketKeepaliveTimeout > 0;
- s.setSoTimeout(dnConf.socketKeepaliveTimeout);
+ socketInputWrapper.setTimeout(dnConf.socketKeepaliveTimeout);
+ } else {
+ socketInputWrapper.setTimeout(dnConf.socketTimeout);
}
op = readOp();
} catch (InterruptedIOException ignored) {
@@ -160,7 +172,7 @@ class DataXceiver extends Receiver imple
// restore normal timeout
if (opsProcessed != 0) {
- s.setSoTimeout(stdTimeout);
+ s.setSoTimeout(dnConf.socketTimeout);
}
opStartTime = now();
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1334116&r1=1334115&r2=1334116&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
Fri May 4 18:50:54 2012
@@ -135,6 +135,7 @@ class DataXceiverServer implements Runna
try {
s = ss.accept();
s.setTcpNoDelay(true);
+ // Timeouts are set within DataXceiver.run()
// Make sure the xceiver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
@@ -144,7 +145,8 @@ class DataXceiverServer implements Runna
+ maxXceiverCount);
}
- new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this))
+ new Daemon(datanode.threadGroup,
+ DataXceiver.create(s, datanode, this))
.start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java?rev=1334116&view=auto
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
(added)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java
Fri May 4 18:50:54 2012
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+import static org.junit.Assert.*;
+
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDataTransferKeepalive {
+ Configuration conf = new HdfsConfiguration();
+ private MiniDFSCluster cluster;
+ private FileSystem fs;
+ private InetSocketAddress dnAddr;
+ private DataNode dn;
+ private DFSClient dfsClient;
+ private static Path TEST_FILE = new Path("/test");
+
+ private static final int KEEPALIVE_TIMEOUT = 1000;
+ private static final int WRITE_TIMEOUT = 3000;
+
+ @Before
+ public void setup() throws Exception {
+ conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
+ KEEPALIVE_TIMEOUT);
+
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build();
+ fs = cluster.getFileSystem();
+ dfsClient = ((DistributedFileSystem)fs).dfs;
+
+ String poolId = cluster.getNamesystem().getBlockPoolId();
+ dn = cluster.getDataNodes().get(0);
+ DatanodeRegistration dnReg = DataNodeTestUtils.getDNRegistrationForBP(
+ dn, poolId);
+ dnAddr = NetUtils.createSocketAddr(dnReg.getXferAddr());
+ }
+
+ @After
+ public void teardown() {
+ cluster.shutdown();
+ }
+
+ /**
+ * Regression test for HDFS-3357. Check that the datanode is respecting
+ * its configured keepalive timeout.
+ */
+ @Test(timeout=30000)
+ public void testKeepaliveTimeouts() throws Exception {
+ DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L);
+
+ // Clients that write aren't currently re-used.
+ assertEquals(0, dfsClient.socketCache.size());
+ assertXceiverCount(0);
+
+ // Reads the file, so we should get a
+ // cached socket, and should have an xceiver on the other side.
+ DFSTestUtil.readFile(fs, TEST_FILE);
+ assertEquals(1, dfsClient.socketCache.size());
+ assertXceiverCount(1);
+
+ // Sleep for a bit longer than the keepalive timeout
+ // and make sure the xceiver died.
+ Thread.sleep(KEEPALIVE_TIMEOUT * 2);
+ assertXceiverCount(0);
+
+ // The socket is still in the cache, because we don't
+ // notice that it's closed until we try to read
+ // from it again.
+ assertEquals(1, dfsClient.socketCache.size());
+
+ // Take it out of the cache - reading should
+ // give an EOF.
+ Socket s = dfsClient.socketCache.get(dnAddr);
+ assertNotNull(s);
+ assertEquals(-1, NetUtils.getInputStream(s).read());
+ }
+
+ /**
+ * Test for the case where the client beings to read a long block, but
doesn't
+ * read bytes off the stream quickly. The datanode should time out sending
the
+ * chunks and the transceiver should die, even if it has a long keepalive.
+ */
+ @Test(timeout=30000)
+ public void testSlowReader() throws Exception {
+ // Restart the DN with a shorter write timeout.
+ DataNodeProperties props = cluster.stopDataNode(0);
+ props.conf.setInt(DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,
+ WRITE_TIMEOUT);
+ props.conf.setInt(DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
+ 120000);
+ assertTrue(cluster.restartDataNode(props, true));
+ // Wait for heartbeats to avoid a startup race where we
+ // try to write the block while the DN is still starting.
+ cluster.triggerHeartbeats();
+
+ dn = cluster.getDataNodes().get(0);
+
+ DFSTestUtil.createFile(fs, TEST_FILE, 1024*1024*8L, (short)1, 0L);
+ FSDataInputStream stm = fs.open(TEST_FILE);
+ try {
+ stm.read();
+ assertXceiverCount(1);
+
+ Thread.sleep(WRITE_TIMEOUT + 1000);
+ // DN should time out in sendChunks, and this should force
+ // the xceiver to exit.
+ assertXceiverCount(0);
+ } finally {
+ IOUtils.closeStream(stm);
+ }
+ }
+
+ private void assertXceiverCount(int expected) {
+ // Subtract 1, since the DataXceiverServer
+ // counts as one
+ int count = dn.getXceiverCount() - 1;
+ if (count != expected) {
+ ReflectionUtils.printThreadInfo(
+ new PrintWriter(System.err),
+ "Thread dumps");
+ fail("Expected " + expected + " xceivers, found " +
+ count);
+ }
+ }
+}