Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1430507&r1=1430506&r2=1430507&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Jan 8 20:44:09 2013 @@ -39,6 +39,7 @@ import java.nio.channels.ClosedChannelEx import java.util.Arrays; import org.apache.commons.logging.Log; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -64,7 +65,6 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.SocketInputWrapper; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -79,8 +79,7 @@ class DataXceiver extends Receiver imple public static final Log LOG = DataNode.LOG; static final Log ClientTraceLog = DataNode.ClientTraceLog; - private final Socket s; - private final boolean isLocal; //is a local connection? + private final Peer peer; private final String remoteAddress; // address of remote side private final String localAddress; // local address of this daemon private final DataNode datanode; @@ -88,7 +87,7 @@ class DataXceiver extends Receiver imple private final DataXceiverServer dataXceiverServer; private final boolean connectToDnViaHostname; private long opStartTime; //the start time of receiving an Op - private final SocketInputWrapper socketIn; + private final InputStream socketIn; private OutputStream socketOut; /** @@ -97,25 +96,23 @@ class DataXceiver extends Receiver imple */ private String previousOpClientName; - public static DataXceiver create(Socket s, DataNode dn, + public static DataXceiver create(Peer peer, DataNode dn, DataXceiverServer dataXceiverServer) throws IOException { - return new DataXceiver(s, dn, dataXceiverServer); + return new DataXceiver(peer, dn, dataXceiverServer); } - private DataXceiver(Socket s, - DataNode datanode, + private DataXceiver(Peer peer, DataNode datanode, DataXceiverServer dataXceiverServer) throws IOException { - this.s = s; + this.peer = peer; this.dnConf = datanode.getDnConf(); - this.socketIn = NetUtils.getInputStream(s); - this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout); - this.isLocal = s.getInetAddress().equals(s.getLocalAddress()); + this.socketIn = peer.getInputStream(); + this.socketOut = peer.getOutputStream(); this.datanode = datanode; this.dataXceiverServer = dataXceiverServer; this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname; - remoteAddress = s.getRemoteSocketAddress().toString(); - localAddress = s.getLocalSocketAddress().toString(); + remoteAddress = peer.getRemoteAddressString(); + localAddress = peer.getLocalAddressString(); if (LOG.isDebugEnabled()) { LOG.debug("Number of active connections is: " @@ -155,11 +152,10 @@ class DataXceiver extends Receiver imple public void run() { int opsProcessed = 0; Op op = null; - - dataXceiverServer.childSockets.add(s); - + + dataXceiverServer.addPeer(peer); try { - + peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; if (dnConf.encryptDataTransfer) { IOStreamPair encryptedStreams = null; @@ -169,8 +165,9 @@ class DataXceiver extends Receiver imple dnConf.encryptionAlgorithm); } catch (InvalidMagicNumberException imne) { LOG.info("Failed to read expected encryption handshake from client " + - "at " + s.getInetAddress() + ". Perhaps the client is running an " + - "older version of Hadoop which does not support encryption"); + "at " + peer.getRemoteAddressString() + ". Perhaps the client " + + "is running an older version of Hadoop which does not support " + + "encryption"); return; } input = encryptedStreams.in; @@ -189,9 +186,9 @@ class DataXceiver extends Receiver imple try { if (opsProcessed != 0) { assert dnConf.socketKeepaliveTimeout > 0; - socketIn.setTimeout(dnConf.socketKeepaliveTimeout); + peer.setReadTimeout(dnConf.socketKeepaliveTimeout); } else { - socketIn.setTimeout(dnConf.socketTimeout); + peer.setReadTimeout(dnConf.socketTimeout); } op = readOp(); } catch (InterruptedIOException ignored) { @@ -202,7 +199,7 @@ class DataXceiver extends Receiver imple if (opsProcessed > 0 && (err instanceof EOFException || err instanceof ClosedChannelException)) { if (LOG.isDebugEnabled()) { - LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops"); + LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops"); } } else { throw err; @@ -212,13 +209,13 @@ class DataXceiver extends Receiver imple // restore normal timeout if (opsProcessed != 0) { - s.setSoTimeout(dnConf.socketTimeout); + peer.setReadTimeout(dnConf.socketTimeout); } opStartTime = now(); processOp(op); ++opsProcessed; - } while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0); + } while (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0); } catch (Throwable t) { LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " + ((op == null) ? "unknown" : op.name()) + " operation " + @@ -230,9 +227,8 @@ class DataXceiver extends Receiver imple + datanode.getXceiverCount()); } updateCurrentThreadName("Cleaning up"); + dataXceiverServer.closePeer(peer); IOUtils.closeStream(in); - IOUtils.closeSocket(s); - dataXceiverServer.childSockets.remove(s); } } @@ -286,8 +282,9 @@ class DataXceiver extends Receiver imple ClientReadStatusProto stat = ClientReadStatusProto.parseFrom( HdfsProtoUtil.vintPrefixed(in)); if (!stat.hasStatus()) { - LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " + - "code after reading. Will close connection."); + LOG.warn("Client " + peer.getRemoteAddressString() + + " did not send a valid status code after reading. " + + "Will close connection."); IOUtils.closeStream(out); } } catch (IOException ioe) { @@ -320,7 +317,7 @@ class DataXceiver extends Receiver imple //update metrics datanode.metrics.addReadBlockOp(elapsed()); - datanode.metrics.incrReadsFromClient(isLocal); + datanode.metrics.incrReadsFromClient(peer.isLocal()); } @Override @@ -358,8 +355,8 @@ class DataXceiver extends Receiver imple LOG.debug("isDatanode=" + isDatanode + ", isClient=" + isClient + ", isTransfer=" + isTransfer); - LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() + - " tcp no delay " + s.getTcpNoDelay()); + LOG.debug("writeBlock receive buf size " + peer.getReceiveBufferSize() + + " tcp no delay " + peer.getTcpNoDelay()); } // We later mutate block's generation stamp and length, but we need to @@ -390,8 +387,8 @@ class DataXceiver extends Receiver imple stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver blockReceiver = new BlockReceiver(block, in, - s.getRemoteSocketAddress().toString(), - s.getLocalSocketAddress().toString(), + peer.getRemoteAddressString(), + peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum); } else { @@ -546,7 +543,7 @@ class DataXceiver extends Receiver imple //update metrics datanode.metrics.addWriteBlockOp(elapsed()); - datanode.metrics.incrWritesFromClient(isLocal); + datanode.metrics.incrWritesFromClient(peer.isLocal()); } @Override @@ -554,7 +551,7 @@ class DataXceiver extends Receiver imple final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets) throws IOException { - checkAccess(null, true, blk, blockToken, + checkAccess(socketOut, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); @@ -641,8 +638,9 @@ class DataXceiver extends Receiver imple } if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start - String msg = "Not able to copy block " + block.getBlockId() + " to " - + s.getRemoteSocketAddress() + " because threads quota is exceeded."; + String msg = "Not able to copy block " + block.getBlockId() + " " + + "to " + peer.getRemoteAddressString() + " because threads " + + "quota is exceeded."; LOG.info(msg); sendResponse(ERROR, msg); return; @@ -671,7 +669,7 @@ class DataXceiver extends Receiver imple datanode.metrics.incrBytesRead((int) read); datanode.metrics.incrBlocksRead(); - LOG.info("Copied " + block + " to " + s.getRemoteSocketAddress()); + LOG.info("Copied " + block + " to " + peer.getRemoteAddressString()); } catch (IOException ioe) { isOpSuccess = false; LOG.info("opCopyBlock " + block + " received exception " + ioe); @@ -716,8 +714,9 @@ class DataXceiver extends Receiver imple } if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start - String msg = "Not able to receive block " + block.getBlockId() + " from " - + s.getRemoteSocketAddress() + " because threads quota is exceeded."; + String msg = "Not able to receive block " + block.getBlockId() + + " from " + peer.getRemoteAddressString() + " because threads " + + "quota is exceeded."; LOG.warn(msg); sendResponse(ERROR, msg); return; @@ -794,7 +793,7 @@ class DataXceiver extends Receiver imple // notify name node datanode.notifyNamenodeReceivedBlock(block, delHint); - LOG.info("Moved " + block + " from " + s.getRemoteSocketAddress()); + LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()); } catch (IOException ioe) { opStatus = ERROR; @@ -817,7 +816,7 @@ class DataXceiver extends Receiver imple try { sendResponse(opStatus, errMsg); } catch (IOException ioe) { - LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()); + LOG.warn("Error writing reply back to " + peer.getRemoteAddressString()); } IOUtils.closeStream(proxyOut); IOUtils.closeStream(blockReceiver); @@ -871,7 +870,7 @@ class DataXceiver extends Receiver imple } - private void checkAccess(DataOutputStream out, final boolean reply, + private void checkAccess(OutputStream out, final boolean reply, final ExtendedBlock blk, final Token<BlockTokenIdentifier> t, final Op op, @@ -886,11 +885,6 @@ class DataXceiver extends Receiver imple } catch(InvalidToken e) { try { if (reply) { - if (out == null) { - out = new DataOutputStream( - NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); - } - BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder() .setStatus(ERROR_ACCESS_TOKEN); if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1430507&r1=1430506&r2=1430507&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Tue Jan 8 20:44:09 2013 @@ -18,18 +18,16 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.channels.AsynchronousCloseException; -import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.net.PeerServer; import org.apache.hadoop.hdfs.server.balancer.Balancer; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -45,11 +43,9 @@ import org.apache.hadoop.util.Daemon; class DataXceiverServer implements Runnable { public static final Log LOG = DataNode.LOG; - ServerSocket ss; - DataNode datanode; - // Record all sockets opened for data transfer - Set<Socket> childSockets = Collections.synchronizedSet( - new HashSet<Socket>()); + private final PeerServer peerServer; + private final DataNode datanode; + private final Set<Peer> peers = new HashSet<Peer>(); /** * Maximal number of concurrent xceivers per node. @@ -109,10 +105,10 @@ class DataXceiverServer implements Runna long estimateBlockSize; - DataXceiverServer(ServerSocket ss, Configuration conf, + DataXceiverServer(PeerServer peerServer, Configuration conf, DataNode datanode) { - this.ss = ss; + this.peerServer = peerServer; this.datanode = datanode; this.maxXceiverCount = @@ -130,12 +126,10 @@ class DataXceiverServer implements Runna @Override public void run() { + Peer peer = null; while (datanode.shouldRun) { - Socket s = null; try { - s = ss.accept(); - s.setTcpNoDelay(true); - // Timeouts are set within DataXceiver.run() + peer = peerServer.accept(); // Make sure the xceiver count is not exceeded int curXceiverCount = datanode.getXceiverCount(); @@ -146,7 +140,7 @@ class DataXceiverServer implements Runna } new Daemon(datanode.threadGroup, - DataXceiver.create(s, datanode, this)) + DataXceiver.create(peer, datanode, this)) .start(); } catch (SocketTimeoutException ignored) { // wake up to see if should continue to run @@ -157,10 +151,10 @@ class DataXceiverServer implements Runna LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace); } } catch (IOException ie) { - IOUtils.closeSocket(s); + IOUtils.cleanup(null, peer); LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ie); } catch (OutOfMemoryError ie) { - IOUtils.closeSocket(s); + IOUtils.cleanup(null, peer); // DataNode can run out of memory if there is too many transfers. // Log the event, Sleep for 30 seconds, other transfers may complete by // then. @@ -176,33 +170,35 @@ class DataXceiverServer implements Runna datanode.shouldRun = false; } } + synchronized (this) { + for (Peer p : peers) { + IOUtils.cleanup(LOG, p); + } + } try { - ss.close(); + peerServer.close(); } catch (IOException ie) { LOG.warn(datanode.getDisplayName() + " :DataXceiverServer: close exception", ie); } } - + void kill() { assert datanode.shouldRun == false : "shoudRun should be set to false before killing"; try { - this.ss.close(); + this.peerServer.close(); } catch (IOException ie) { LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie); } + } + + synchronized void addPeer(Peer peer) { + peers.add(peer); + } - // close all the sockets that were accepted earlier - synchronized (childSockets) { - for (Iterator<Socket> it = childSockets.iterator(); - it.hasNext();) { - Socket thissock = it.next(); - try { - thissock.close(); - } catch (IOException e) { - } - } - } + synchronized void closePeer(Peer peer) { + peers.remove(peer); + IOUtils.cleanup(null, peer); } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1430507&r1=1430506&r2=1430507&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Tue Jan 8 20:44:09 2013 @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -559,13 +560,13 @@ public class NamenodeFsck { blockReader = BlockReaderFactory.newBlockReader( new BlockReaderFactory.Params(new Conf(conf)). - setSocket(s).setBlock(block). + setPeer(TcpPeerServer.peerFromSocketAndKey(s, + namenode.getRpcServer().getDataEncryptionKey())). + setBlock(block). setFile(BlockReaderFactory.getFileName(targetAddr, block.getBlockPoolId(), block.getBlockId())). setBlockToken(lblock.getBlockToken()). - setEncryptionKey(namenode.getRpcServer().getDataEncryptionKey()). - setLen(-1)); - + setDatanodeID(chosenNode)); } catch (IOException ex) { // Put chosen node into dead list, continue LOG.info("Failed to connect to " + targetAddr + ":" + ex); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1430507&r1=1430506&r2=1430507&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Tue Jan 8 20:44:09 2013 @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.CommonConfig import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSClient.Conf; +import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -152,13 +153,13 @@ public class BlockReaderTestUtil { return BlockReaderFactory.newBlockReader( new BlockReaderFactory.Params(new Conf(conf)). - setSocket(sock). + setPeer(TcpPeerServer.peerFromSocket(sock)). setFile(targetAddr.toString() + ":" + block.getBlockId()). setBlock(block).setBlockToken(testBlock.getBlockToken()). setStartOffset(offset).setLen(lenToRead). setBufferSize(conf.getInt( CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096)). - setVerifyChecksum(true)); + setVerifyChecksum(true).setDatanodeID(nodes[0])); } /** Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1430507&r1=1430506&r2=1430507&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java Tue Jan 8 20:44:09 2013 @@ -61,7 +61,7 @@ public class TestClientBlockVerification util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024)); util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true); verify(reader).sendReadResult(Status.CHECKSUM_OK); - reader.close(); + reader.close(null); } /** @@ -76,7 +76,7 @@ public class TestClientBlockVerification // We asked the blockreader for the whole file, and only read // half of it, so no CHECKSUM_OK verify(reader, never()).sendReadResult(Status.CHECKSUM_OK); - reader.close(); + reader.close(null); } /** @@ -92,7 +92,7 @@ public class TestClientBlockVerification // And read half the file util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true); verify(reader).sendReadResult(Status.CHECKSUM_OK); - reader.close(); + reader.close(null); } /** @@ -111,7 +111,7 @@ public class TestClientBlockVerification util.getBlockReader(testBlock, startOffset, length)); util.readAndCheckEOS(reader, length, true); verify(reader).sendReadResult(Status.CHECKSUM_OK); - reader.close(); + reader.close(null); } } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java?rev=1430507&r1=1430506&r2=1430507&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java Tue Jan 8 20:44:09 2013 @@ -18,28 +18,20 @@ package org.apache.hadoop.hdfs; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.spy; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.Socket; -import java.security.PrivilegedExceptionAction; + +import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.security.token.Token; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; @@ -55,59 +47,31 @@ public class TestConnCache { static final int BLOCK_SIZE = 4096; static final int FILE_SIZE = 3 * BLOCK_SIZE; - final static int CACHE_SIZE = 4; - final static long CACHE_EXPIRY_MS = 200; - static Configuration conf = null; - static MiniDFSCluster cluster = null; - static FileSystem fs = null; - static SocketCache cache; - - static final Path testFile = new Path("/testConnCache.dat"); - static byte authenticData[] = null; - - static BlockReaderTestUtil util = null; - /** * A mock Answer to remember the BlockReader used. * * It verifies that all invocation to DFSInputStream.getBlockReader() - * use the same socket. + * use the same peer. */ private class MockGetBlockReader implements Answer<RemoteBlockReader2> { public RemoteBlockReader2 reader = null; - private Socket sock = null; + private Peer peer = null; @Override public RemoteBlockReader2 answer(InvocationOnMock invocation) throws Throwable { RemoteBlockReader2 prevReader = reader; reader = (RemoteBlockReader2) invocation.callRealMethod(); - if (sock == null) { - sock = reader.dnSock; + if (peer == null) { + peer = reader.getPeer(); } else if (prevReader != null) { - assertSame("DFSInputStream should use the same socket", - sock, reader.dnSock); + Assert.assertSame("DFSInputStream should use the same peer", + peer, reader.getPeer()); } return reader; } } - @BeforeClass - public static void setupCluster() throws Exception { - final int REPLICATION_FACTOR = 1; - - /* create a socket cache. There is only one socket cache per jvm */ - cache = SocketCache.getInstance(CACHE_SIZE, CACHE_EXPIRY_MS); - - util = new BlockReaderTestUtil(REPLICATION_FACTOR); - cluster = util.getCluster(); - conf = util.getConf(); - fs = cluster.getFileSystem(); - - authenticData = util.writeFile(testFile, FILE_SIZE / 1024); - } - - /** * (Optionally) seek to position, read and verify data. * @@ -117,9 +81,10 @@ public class TestConnCache { long pos, byte[] buffer, int offset, - int length) + int length, + byte[] authenticData) throws IOException { - assertTrue("Test buffer too small", buffer.length >= offset + length); + Assert.assertTrue("Test buffer too small", buffer.length >= offset + length); if (pos >= 0) in.seek(pos); @@ -129,7 +94,7 @@ public class TestConnCache { while (length > 0) { int cnt = in.read(buffer, offset, length); - assertTrue("Error in read", cnt > 0); + Assert.assertTrue("Error in read", cnt > 0); offset += cnt; length -= cnt; } @@ -145,115 +110,22 @@ public class TestConnCache { } /** - * Test the SocketCache itself. - */ - @Test - public void testSocketCache() throws Exception { - // Make a client - InetSocketAddress nnAddr = - new InetSocketAddress("localhost", cluster.getNameNodePort()); - DFSClient client = new DFSClient(nnAddr, conf); - - // Find out the DN addr - LocatedBlock block = - client.getNamenode().getBlockLocations( - testFile.toString(), 0, FILE_SIZE) - .getLocatedBlocks().get(0); - DataNode dn = util.getDataNode(block); - InetSocketAddress dnAddr = dn.getXferAddress(); - - - // Make some sockets to the DN - Socket[] dnSockets = new Socket[CACHE_SIZE]; - for (int i = 0; i < dnSockets.length; ++i) { - dnSockets[i] = client.socketFactory.createSocket( - dnAddr.getAddress(), dnAddr.getPort()); - } - - - // Insert a socket to the NN - Socket nnSock = new Socket(nnAddr.getAddress(), nnAddr.getPort()); - cache.put(nnSock, null); - assertSame("Read the write", nnSock, cache.get(nnAddr).sock); - cache.put(nnSock, null); - - // Insert DN socks - for (Socket dnSock : dnSockets) { - cache.put(dnSock, null); - } - - assertEquals("NN socket evicted", null, cache.get(nnAddr)); - assertTrue("Evicted socket closed", nnSock.isClosed()); - - // Lookup the DN socks - for (Socket dnSock : dnSockets) { - assertEquals("Retrieve cached sockets", dnSock, cache.get(dnAddr).sock); - dnSock.close(); - } - - assertEquals("Cache is empty", 0, cache.size()); - } - - - /** - * Test the SocketCache expiry. - * Verify that socket cache entries expire after the set - * expiry time. - */ - @Test - public void testSocketCacheExpiry() throws Exception { - // Make a client - InetSocketAddress nnAddr = - new InetSocketAddress("localhost", cluster.getNameNodePort()); - DFSClient client = new DFSClient(nnAddr, conf); - - // Find out the DN addr - LocatedBlock block = - client.getNamenode().getBlockLocations( - testFile.toString(), 0, FILE_SIZE) - .getLocatedBlocks().get(0); - DataNode dn = util.getDataNode(block); - InetSocketAddress dnAddr = dn.getXferAddress(); - - - // Make some sockets to the DN and put in cache - Socket[] dnSockets = new Socket[CACHE_SIZE]; - for (int i = 0; i < dnSockets.length; ++i) { - dnSockets[i] = client.socketFactory.createSocket( - dnAddr.getAddress(), dnAddr.getPort()); - cache.put(dnSockets[i], null); - } - - // Client side still has the sockets cached - assertEquals(CACHE_SIZE, client.socketCache.size()); - - //sleep for a second and see if it expired - Thread.sleep(CACHE_EXPIRY_MS + 1000); - - // Client side has no sockets cached - assertEquals(0, client.socketCache.size()); - - //sleep for another second and see if - //the daemon thread runs fine on empty cache - Thread.sleep(CACHE_EXPIRY_MS + 1000); - } - - - /** * Read a file served entirely from one DN. Seek around and read from * different offsets. And verify that they all use the same socket. - * - * @throws java.io.IOException + * @throws Exception */ @Test @SuppressWarnings("unchecked") - public void testReadFromOneDN() throws IOException { - LOG.info("Starting testReadFromOneDN()"); + public void testReadFromOneDN() throws Exception { + BlockReaderTestUtil util = new BlockReaderTestUtil(1, + new HdfsConfiguration()); + final Path testFile = new Path("/testConnCache.dat"); + byte authenticData[] = util.writeFile(testFile, FILE_SIZE / 1024); DFSClient client = new DFSClient( - new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); - DFSInputStream in = spy(client.open(testFile.toString())); + new InetSocketAddress("localhost", + util.getCluster().getNameNodePort()), util.getConf()); + DFSInputStream in = Mockito.spy(client.open(testFile.toString())); LOG.info("opened " + testFile.toString()); - byte[] dataBuf = new byte[BLOCK_SIZE]; MockGetBlockReader answer = new MockGetBlockReader(); @@ -270,18 +142,15 @@ public class TestConnCache { Matchers.anyString()); // Initial read - pread(in, 0, dataBuf, 0, dataBuf.length); + pread(in, 0, dataBuf, 0, dataBuf.length, authenticData); // Read again and verify that the socket is the same - pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length); - pread(in, 1024, dataBuf, 0, dataBuf.length); - pread(in, -1, dataBuf, 0, dataBuf.length); // No seek; just read - pread(in, 64, dataBuf, 0, dataBuf.length / 2); + pread(in, FILE_SIZE - dataBuf.length, dataBuf, 0, dataBuf.length, + authenticData); + pread(in, 1024, dataBuf, 0, dataBuf.length, authenticData); + // No seek; just read + pread(in, -1, dataBuf, 0, dataBuf.length, authenticData); + pread(in, 64, dataBuf, 0, dataBuf.length / 2, authenticData); in.close(); } - - @AfterClass - public static void teardownCluster() throws Exception { - util.shutdown(); - } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java?rev=1430507&r1=1430506&r2=1430507&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferKeepalive.java Tue Jan 8 20:44:09 2013 @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FSDataInputS import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -92,13 +93,13 @@ public class TestDataTransferKeepalive { DFSTestUtil.createFile(fs, TEST_FILE, 1L, (short)1, 0L); // Clients that write aren't currently re-used. - assertEquals(0, dfsClient.socketCache.size()); + assertEquals(0, dfsClient.peerCache.size()); assertXceiverCount(0); // Reads the file, so we should get a // cached socket, and should have an xceiver on the other side. DFSTestUtil.readFile(fs, TEST_FILE); - assertEquals(1, dfsClient.socketCache.size()); + assertEquals(1, dfsClient.peerCache.size()); assertXceiverCount(1); // Sleep for a bit longer than the keepalive timeout @@ -109,13 +110,13 @@ public class TestDataTransferKeepalive { // The socket is still in the cache, because we don't // notice that it's closed until we try to read // from it again. - assertEquals(1, dfsClient.socketCache.size()); + assertEquals(1, dfsClient.peerCache.size()); // Take it out of the cache - reading should // give an EOF. - Socket s = dfsClient.socketCache.get(dnAddr).sock; - assertNotNull(s); - assertEquals(-1, NetUtils.getInputStream(s).read()); + Peer peer = dfsClient.peerCache.get(dn.getDatanodeId()); + assertNotNull(peer); + assertEquals(-1, peer.getInputStream().read()); } /** @@ -174,14 +175,14 @@ public class TestDataTransferKeepalive { } DFSClient client = ((DistributedFileSystem)fs).dfs; - assertEquals(5, client.socketCache.size()); + assertEquals(5, client.peerCache.size()); // Let all the xceivers timeout Thread.sleep(1500); assertXceiverCount(0); // Client side still has the sockets cached - assertEquals(5, client.socketCache.size()); + assertEquals(5, client.peerCache.size()); // Reading should not throw an exception. DFSTestUtil.readFile(fs, TEST_FILE); Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java?rev=1430507&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDisableConnCache.java Tue Jan 8 20:44:09 2013 @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +/** + * This class tests disabling client connection caching in a single node + * mini-cluster. + */ +public class TestDisableConnCache { + static final Log LOG = LogFactory.getLog(TestDisableConnCache.class); + + static final int BLOCK_SIZE = 4096; + static final int FILE_SIZE = 3 * BLOCK_SIZE; + + /** + * Test that the socket cache can be disabled by setting the capacity to + * 0. Regression test for HDFS-3365. + * @throws Exception + */ + @Test + public void testDisableCache() throws Exception { + HdfsConfiguration confWithoutCache = new HdfsConfiguration(); + // Configure a new instance with no peer caching, ensure that it doesn't + // cache anything + confWithoutCache.setInt( + DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY, 0); + BlockReaderTestUtil util = new BlockReaderTestUtil(1, confWithoutCache); + final Path testFile = new Path("/testConnCache.dat"); + util.writeFile(testFile, FILE_SIZE / 1024); + FileSystem fsWithoutCache = FileSystem.newInstance(util.getConf()); + try { + DFSTestUtil.readFile(fsWithoutCache, testFile); + assertEquals(0, ((DistributedFileSystem)fsWithoutCache).dfs.peerCache.size()); + } finally { + fsWithoutCache.close(); + util.shutdown(); + } + } +} \ No newline at end of file Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java?rev=1430507&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPeerCache.java Tue Jan 8 20:44:09 2013 @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.ReadableByteChannel; +import java.util.HashSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.net.Peer; +import org.junit.Test; + +public class TestPeerCache { + static final Log LOG = LogFactory.getLog(TestPeerCache.class); + + private static final int CAPACITY = 3; + private static final int EXPIRY_PERIOD = 20; + private static PeerCache cache = + PeerCache.getInstance(CAPACITY, EXPIRY_PERIOD); + + private static class FakePeer implements Peer { + private boolean closed = false; + + private DatanodeID dnId; + + public FakePeer(DatanodeID dnId) { + this.dnId = dnId; + } + + @Override + public ReadableByteChannel getInputStreamChannel() { + throw new UnsupportedOperationException(); + } + + @Override + public void setReadTimeout(int timeoutMs) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getReceiveBufferSize() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getTcpNoDelay() throws IOException { + return false; + } + + @Override + public void setWriteTimeout(int timeoutMs) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isClosed() { + return closed; + } + + @Override + public void close() throws IOException { + closed = true; + } + + @Override + public String getRemoteAddressString() { + return dnId.getInfoAddr(); + } + + @Override + public String getLocalAddressString() { + return "127.0.0.1:123"; + } + + @Override + public InputStream getInputStream() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public OutputStream getOutputStream() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public String toString() { + return "FakePeer(dnId=" + dnId + ")"; + } + } + + @Test + public void testAddAndRetrieve() throws Exception { + DatanodeID dnId = new DatanodeID("192.168.0.1", + "fakehostname", "fake_storage_id", + 100, 101, 102); + FakePeer peer = new FakePeer(dnId); + cache.put(dnId, peer); + assertTrue(!peer.isClosed()); + assertEquals(1, cache.size()); + assertEquals(peer, cache.get(dnId)); + assertEquals(0, cache.size()); + cache.clear(); + } + + @Test + public void testExpiry() throws Exception { + DatanodeID dnIds[] = new DatanodeID[CAPACITY]; + FakePeer peers[] = new FakePeer[CAPACITY]; + for (int i = 0; i < CAPACITY; ++i) { + dnIds[i] = new DatanodeID("192.168.0.1", + "fakehostname_" + i, "fake_storage_id", + 100, 101, 102); + peers[i] = new FakePeer(dnIds[i]); + } + for (int i = 0; i < CAPACITY; ++i) { + cache.put(dnIds[i], peers[i]); + } + // Check that the peers are cached + assertEquals(CAPACITY, cache.size()); + + // Wait for the peers to expire + Thread.sleep(EXPIRY_PERIOD * 50); + assertEquals(0, cache.size()); + + // make sure that the peers were closed when they were expired + for (int i = 0; i < CAPACITY; ++i) { + assertTrue(peers[i].isClosed()); + } + + // sleep for another second and see if + // the daemon thread runs fine on empty cache + Thread.sleep(EXPIRY_PERIOD * 50); + cache.clear(); + } + + @Test + public void testEviction() throws Exception { + DatanodeID dnIds[] = new DatanodeID[CAPACITY + 1]; + FakePeer peers[] = new FakePeer[CAPACITY + 1]; + for (int i = 0; i < dnIds.length; ++i) { + dnIds[i] = new DatanodeID("192.168.0.1", + "fakehostname_" + i, "fake_storage_id_" + i, + 100, 101, 102); + peers[i] = new FakePeer(dnIds[i]); + } + for (int i = 0; i < CAPACITY; ++i) { + cache.put(dnIds[i], peers[i]); + } + // Check that the peers are cached + assertEquals(CAPACITY, cache.size()); + + // Add another entry and check that the first entry was evicted + cache.put(dnIds[CAPACITY], peers[CAPACITY]); + assertEquals(CAPACITY, cache.size()); + assertSame(null, cache.get(dnIds[0])); + + // Make sure that the other entries are still there + for (int i = 1; i < CAPACITY; ++i) { + Peer peer = cache.get(dnIds[i]); + assertSame(peers[i], peer); + assertTrue(!peer.isClosed()); + peer.close(); + } + assertEquals(1, cache.size()); + cache.clear(); + } + + @Test + public void testMultiplePeersWithSameDnId() throws Exception { + DatanodeID dnId = new DatanodeID("192.168.0.1", + "fakehostname", "fake_storage_id", + 100, 101, 102); + HashSet<FakePeer> peers = new HashSet<FakePeer>(CAPACITY); + for (int i = 0; i < CAPACITY; ++i) { + FakePeer peer = new FakePeer(dnId); + peers.add(peer); + cache.put(dnId, peer); + } + // Check that all of the peers ended up in the cache + assertEquals(CAPACITY, cache.size()); + while (!peers.isEmpty()) { + Peer peer = cache.get(dnId); + assertTrue(peer != null); + assertTrue(!peer.isClosed()); + peers.remove(peer); + } + assertEquals(0, cache.size()); + cache.clear(); + } +} Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1430507&r1=1430506&r2=1430507&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Tue Jan 8 20:44:09 2013 @@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.DFSClient. import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -147,9 +148,10 @@ public class TestBlockTokenWithDFS { "test-blockpoolid", block.getBlockId()); blockReader = BlockReaderFactory.newBlockReader( new BlockReaderFactory.Params(new Conf(conf)). - setSocket(s).setBlock(block).setFile(file). + setPeer(TcpPeerServer.peerFromSocket(s)). + setBlock(block).setFile(file). setBlockToken(lblock.getBlockToken()).setStartOffset(0). - setLen(-1)); + setLen(-1).setDatanodeID(nodes[0])); } catch (IOException ex) { if (ex instanceof InvalidBlockTokenException) { assertFalse("OP_READ_BLOCK: access token is invalid, " Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=1430507&r1=1430506&r2=1430507&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Tue Jan 8 20:44:09 2013 @@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.net.TcpPeerServer; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -284,8 +285,9 @@ public class TestDataNodeVolumeFailure { setFile(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlock(block).setBlockToken(lblock.getBlockToken()). - setSocket(s)); - blockReader.close(); + setPeer(TcpPeerServer.peerFromSocket(s)). + setDatanodeID(datanode)); + blockReader.close(null); } /**
