Hello Lei,

There is a known bug in 2.4.0 that can cause hedged reads to hang.  I fixed
it in HDFS-6231:

https://issues.apache.org/jira/browse/HDFS-6231

This patch will be included in the forthcoming 2.4.1 release.  I'm curious
to see if applying this patch fixes the problem for you.  Can you try it
and let us know?  Thank you!

Chris Nauroth
Hortonworks
http://hortonworks.com/



On Thu, Jun 5, 2014 at 8:34 PM, lei liu <liulei...@gmail.com> wrote:

> I use hadoop2.4.
>
> When I use "hedged read", If there is only one live datanode, the reading
> from  the datanode throw TimeoutException and ChecksumException., the
> Client will infinite wait.
>
> Example below test case:
>   @Test
>   public void testException() throws IOException, InterruptedException,
> ExecutionException {
>     Configuration conf = new Configuration();
>     int numHedgedReadPoolThreads = 5;
>     final int hedgedReadTimeoutMillis = 50;
>     conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
> numHedgedReadPoolThreads);
>     conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
>       hedgedReadTimeoutMillis);
>     // Set up the InjectionHandler
>     DFSClientFaultInjector.instance =
> Mockito.mock(DFSClientFaultInjector.class);
>     DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
>     // make preads ChecksumException
>     Mockito.doAnswer(new Answer<Void>() {
>       @Override
>       public Void answer(InvocationOnMock invocation) throws Throwable {
>         if(true) {
>           Thread.sleep(hedgedReadTimeoutMillis + 10);
>           throw new ChecksumException("test", 100);
>         }
>         return null;
>       }
>     }*).when(injector).fetchFromDatanodeException();*
>
>     MiniDFSCluster cluster = new
> MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
>     DistributedFileSystem fileSys = cluster.getFileSystem();
>     DFSClient dfsClient = fileSys.getClient();
>     DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
>
>     try {
>       Path file = new Path("/hedgedReadException.dat");
>       FSDataOutputStream  output = fileSys.create(file,(short)1);
>       byte[] data = new byte[64 * 1024];
>       output.write(data);
>       output.flush();
>       output.write(data);
>       output.flush();
>       output.write(data);
>       output.flush();
>       output.close();
>       byte[] buffer = new byte[64 * 1024];
>       FSDataInputStream  input = fileSys.open(file);
>       input.read(0, buffer, 0, 1024);
>       input.close();
>       assertTrue(metrics.getHedgedReadOps() == 1);
>       assertTrue(metrics.getHedgedReadWins() == 1);
>     } finally {
>       fileSys.close();
>       cluster.shutdown();
>       Mockito.reset(injector);
>     }
>   }
>
>
> *The code of actualGetFromOneDataNode() method call
> **fetchFromDatanodeException()
> method as below:*
>       try {
>         *DFSClientFaultInjector.get().fetchFromDatanodeException();*
>         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
>         int len = (int) (end - start + 1);
>         reader = new BlockReaderFactory(dfsClient.getConf()).
>             setInetSocketAddress(targetAddr).
>             setRemotePeerFactory(dfsClient).
>             setDatanodeInfo(chosenNode).
>             setFileName(src).
>             setBlock(block.getBlock()).
>             setBlockToken(blockToken).
>             setStartOffset(start).
>             setVerifyChecksum(verifyChecksum).
>             setClientName(dfsClient.clientName).
>             setLength(len).
>             setCachingStrategy(curCachingStrategy).
>             setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
>             setClientCacheContext(dfsClient.getClientContext()).
>             setUserGroupInformation(dfsClient.ugi).
>             setConfiguration(dfsClient.getConfiguration()).
>             build();
>         int nread = reader.readAll(buf, offset, len);
>         if (nread != len) {
>           throw new IOException("truncated return from reader.read(): " +
>                                 "excpected " + len + ", got " + nread);
>         }
>         return;
>       } catch (ChecksumException e) {
>         String msg = "fetchBlockByteRange(). Got a checksum exception for "
>             + src + " at " + block.getBlock() + ":" + e.getPos() + " from "
>             + chosenNode;
>         DFSClient.LOG.warn(msg);
>         // we want to remember what we have tried
>         addIntoCorruptedBlockMap(block.getBlock(), chosenNode,
> corruptedBlockMap);
>         addToDeadNodes(chosenNode);
>         throw new IOException(msg);
>       }
>

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

Reply via email to