Hi Chris, I review the patch, I think there is problem in the patch.
Example there are two futures, if the first return futrue is failure and then the the second future will be cancled. 2014-06-07 3:44 GMT+08:00 Chris Nauroth <cnaur...@hortonworks.com>: > Hello Lei, > > There is a known bug in 2.4.0 that can cause hedged reads to hang. I fixed > it in HDFS-6231: > > https://issues.apache.org/jira/browse/HDFS-6231 > > This patch will be included in the forthcoming 2.4.1 release. I'm curious > to see if applying this patch fixes the problem for you. Can you try it > and let us know? Thank you! > > Chris Nauroth > Hortonworks > http://hortonworks.com/ > > > > On Thu, Jun 5, 2014 at 8:34 PM, lei liu <liulei...@gmail.com> wrote: > > > I use hadoop2.4. > > > > When I use "hedged read", If there is only one live datanode, the reading > > from the datanode throw TimeoutException and ChecksumException., the > > Client will infinite wait. > > > > Example below test case: > > @Test > > public void testException() throws IOException, InterruptedException, > > ExecutionException { > > Configuration conf = new Configuration(); > > int numHedgedReadPoolThreads = 5; > > final int hedgedReadTimeoutMillis = 50; > > conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, > > numHedgedReadPoolThreads); > > > conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, > > hedgedReadTimeoutMillis); > > // Set up the InjectionHandler > > DFSClientFaultInjector.instance = > > Mockito.mock(DFSClientFaultInjector.class); > > DFSClientFaultInjector injector = DFSClientFaultInjector.instance; > > // make preads ChecksumException > > Mockito.doAnswer(new Answer<Void>() { > > @Override > > public Void answer(InvocationOnMock invocation) throws Throwable { > > if(true) { > > Thread.sleep(hedgedReadTimeoutMillis + 10); > > throw new ChecksumException("test", 100); > > } > > return null; > > } > > }*).when(injector).fetchFromDatanodeException();* > > > > MiniDFSCluster cluster = new > > MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build(); > > DistributedFileSystem fileSys = cluster.getFileSystem(); > > DFSClient dfsClient = fileSys.getClient(); > > DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics(); > > > > try { > > Path file = new Path("/hedgedReadException.dat"); > > FSDataOutputStream output = fileSys.create(file,(short)1); > > byte[] data = new byte[64 * 1024]; > > output.write(data); > > output.flush(); > > output.write(data); > > output.flush(); > > output.write(data); > > output.flush(); > > output.close(); > > byte[] buffer = new byte[64 * 1024]; > > FSDataInputStream input = fileSys.open(file); > > input.read(0, buffer, 0, 1024); > > input.close(); > > assertTrue(metrics.getHedgedReadOps() == 1); > > assertTrue(metrics.getHedgedReadWins() == 1); > > } finally { > > fileSys.close(); > > cluster.shutdown(); > > Mockito.reset(injector); > > } > > } > > > > > > *The code of actualGetFromOneDataNode() method call > > **fetchFromDatanodeException() > > method as below:* > > try { > > *DFSClientFaultInjector.get().fetchFromDatanodeException();* > > Token<BlockTokenIdentifier> blockToken = block.getBlockToken(); > > int len = (int) (end - start + 1); > > reader = new BlockReaderFactory(dfsClient.getConf()). > > setInetSocketAddress(targetAddr). > > setRemotePeerFactory(dfsClient). > > setDatanodeInfo(chosenNode). > > setFileName(src). > > setBlock(block.getBlock()). > > setBlockToken(blockToken). > > setStartOffset(start). > > setVerifyChecksum(verifyChecksum). > > setClientName(dfsClient.clientName). > > setLength(len). > > setCachingStrategy(curCachingStrategy). > > setAllowShortCircuitLocalReads(allowShortCircuitLocalReads). > > setClientCacheContext(dfsClient.getClientContext()). > > setUserGroupInformation(dfsClient.ugi). > > setConfiguration(dfsClient.getConfiguration()). > > build(); > > int nread = reader.readAll(buf, offset, len); > > if (nread != len) { > > throw new IOException("truncated return from reader.read(): " + > > "excpected " + len + ", got " + nread); > > } > > return; > > } catch (ChecksumException e) { > > String msg = "fetchBlockByteRange(). Got a checksum exception > for " > > + src + " at " + block.getBlock() + ":" + e.getPos() + " > from " > > + chosenNode; > > DFSClient.LOG.warn(msg); > > // we want to remember what we have tried > > addIntoCorruptedBlockMap(block.getBlock(), chosenNode, > > corruptedBlockMap); > > addToDeadNodes(chosenNode); > > throw new IOException(msg); > > } > > > > -- > CONFIDENTIALITY NOTICE > NOTICE: This message is intended for the use of the individual or entity to > which it is addressed and may contain information that is confidential, > privileged and exempt from disclosure under applicable law. If the reader > of this message is not the intended recipient, you are hereby notified that > any printing, copying, dissemination, distribution, disclosure or > forwarding of this communication is strictly prohibited. If you have > received this communication in error, please contact the sender immediately > and delete it from your system. Thank You. >