I use hadoop2.4. When I use "hedged read", If there is only one live datanode, the reading from the datanode throw TimeoutException and ChecksumException., the Client will infinite wait.
Example below test case: @Test public void testException() throws IOException, InterruptedException, ExecutionException { Configuration conf = new Configuration(); int numHedgedReadPoolThreads = 5; final int hedgedReadTimeoutMillis = 50; conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, numHedgedReadPoolThreads); conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, hedgedReadTimeoutMillis); // Set up the InjectionHandler DFSClientFaultInjector.instance = Mockito.mock(DFSClientFaultInjector.class); DFSClientFaultInjector injector = DFSClientFaultInjector.instance; // make preads ChecksumException Mockito.doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { if(true) { Thread.sleep(hedgedReadTimeoutMillis + 10); throw new ChecksumException("test", 100); } return null; } }*).when(injector).fetchFromDatanodeException();* MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build(); DistributedFileSystem fileSys = cluster.getFileSystem(); DFSClient dfsClient = fileSys.getClient(); DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics(); try { Path file = new Path("/hedgedReadException.dat"); FSDataOutputStream output = fileSys.create(file,(short)1); byte[] data = new byte[64 * 1024]; output.write(data); output.flush(); output.write(data); output.flush(); output.write(data); output.flush(); output.close(); byte[] buffer = new byte[64 * 1024]; FSDataInputStream input = fileSys.open(file); input.read(0, buffer, 0, 1024); input.close(); assertTrue(metrics.getHedgedReadOps() == 1); assertTrue(metrics.getHedgedReadWins() == 1); } finally { fileSys.close(); cluster.shutdown(); Mockito.reset(injector); } } *The code of actualGetFromOneDataNode() method call **fetchFromDatanodeException() method as below:* try { *DFSClientFaultInjector.get().fetchFromDatanodeException();* Token<BlockTokenIdentifier> blockToken = block.getBlockToken(); int len = (int) (end - start + 1); reader = new BlockReaderFactory(dfsClient.getConf()). setInetSocketAddress(targetAddr). setRemotePeerFactory(dfsClient). setDatanodeInfo(chosenNode). setFileName(src). setBlock(block.getBlock()). setBlockToken(blockToken). setStartOffset(start). setVerifyChecksum(verifyChecksum). setClientName(dfsClient.clientName). setLength(len). setCachingStrategy(curCachingStrategy). setAllowShortCircuitLocalReads(allowShortCircuitLocalReads). setClientCacheContext(dfsClient.getClientContext()). setUserGroupInformation(dfsClient.ugi). setConfiguration(dfsClient.getConfiguration()). build(); int nread = reader.readAll(buf, offset, len); if (nread != len) { throw new IOException("truncated return from reader.read(): " + "excpected " + len + ", got " + nread); } return; } catch (ChecksumException e) { String msg = "fetchBlockByteRange(). Got a checksum exception for " + src + " at " + block.getBlock() + ":" + e.getPos() + " from " + chosenNode; DFSClient.LOG.warn(msg); // we want to remember what we have tried addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap); addToDeadNodes(chosenNode); throw new IOException(msg); }