Hi Chris,

I review the patch, I think there is problem in the patch.

Example  there are two futures, if the first return futrue is failure  and
then the the second future will be cancled.


2014-06-07 3:44 GMT+08:00 Chris Nauroth <cnaur...@hortonworks.com>:

> Hello Lei,
>
> There is a known bug in 2.4.0 that can cause hedged reads to hang.  I fixed
> it in HDFS-6231:
>
> https://issues.apache.org/jira/browse/HDFS-6231
>
> This patch will be included in the forthcoming 2.4.1 release.  I'm curious
> to see if applying this patch fixes the problem for you.  Can you try it
> and let us know?  Thank you!
>
> Chris Nauroth
> Hortonworks
> http://hortonworks.com/
>
>
>
> On Thu, Jun 5, 2014 at 8:34 PM, lei liu <liulei...@gmail.com> wrote:
>
> > I use hadoop2.4.
> >
> > When I use "hedged read", If there is only one live datanode, the reading
> > from  the datanode throw TimeoutException and ChecksumException., the
> > Client will infinite wait.
> >
> > Example below test case:
> >   @Test
> >   public void testException() throws IOException, InterruptedException,
> > ExecutionException {
> >     Configuration conf = new Configuration();
> >     int numHedgedReadPoolThreads = 5;
> >     final int hedgedReadTimeoutMillis = 50;
> >     conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
> > numHedgedReadPoolThreads);
> >
> conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
> >       hedgedReadTimeoutMillis);
> >     // Set up the InjectionHandler
> >     DFSClientFaultInjector.instance =
> > Mockito.mock(DFSClientFaultInjector.class);
> >     DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
> >     // make preads ChecksumException
> >     Mockito.doAnswer(new Answer<Void>() {
> >       @Override
> >       public Void answer(InvocationOnMock invocation) throws Throwable {
> >         if(true) {
> >           Thread.sleep(hedgedReadTimeoutMillis + 10);
> >           throw new ChecksumException("test", 100);
> >         }
> >         return null;
> >       }
> >     }*).when(injector).fetchFromDatanodeException();*
> >
> >     MiniDFSCluster cluster = new
> > MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
> >     DistributedFileSystem fileSys = cluster.getFileSystem();
> >     DFSClient dfsClient = fileSys.getClient();
> >     DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
> >
> >     try {
> >       Path file = new Path("/hedgedReadException.dat");
> >       FSDataOutputStream  output = fileSys.create(file,(short)1);
> >       byte[] data = new byte[64 * 1024];
> >       output.write(data);
> >       output.flush();
> >       output.write(data);
> >       output.flush();
> >       output.write(data);
> >       output.flush();
> >       output.close();
> >       byte[] buffer = new byte[64 * 1024];
> >       FSDataInputStream  input = fileSys.open(file);
> >       input.read(0, buffer, 0, 1024);
> >       input.close();
> >       assertTrue(metrics.getHedgedReadOps() == 1);
> >       assertTrue(metrics.getHedgedReadWins() == 1);
> >     } finally {
> >       fileSys.close();
> >       cluster.shutdown();
> >       Mockito.reset(injector);
> >     }
> >   }
> >
> >
> > *The code of actualGetFromOneDataNode() method call
> > **fetchFromDatanodeException()
> > method as below:*
> >       try {
> >         *DFSClientFaultInjector.get().fetchFromDatanodeException();*
> >         Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
> >         int len = (int) (end - start + 1);
> >         reader = new BlockReaderFactory(dfsClient.getConf()).
> >             setInetSocketAddress(targetAddr).
> >             setRemotePeerFactory(dfsClient).
> >             setDatanodeInfo(chosenNode).
> >             setFileName(src).
> >             setBlock(block.getBlock()).
> >             setBlockToken(blockToken).
> >             setStartOffset(start).
> >             setVerifyChecksum(verifyChecksum).
> >             setClientName(dfsClient.clientName).
> >             setLength(len).
> >             setCachingStrategy(curCachingStrategy).
> >             setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
> >             setClientCacheContext(dfsClient.getClientContext()).
> >             setUserGroupInformation(dfsClient.ugi).
> >             setConfiguration(dfsClient.getConfiguration()).
> >             build();
> >         int nread = reader.readAll(buf, offset, len);
> >         if (nread != len) {
> >           throw new IOException("truncated return from reader.read(): " +
> >                                 "excpected " + len + ", got " + nread);
> >         }
> >         return;
> >       } catch (ChecksumException e) {
> >         String msg = "fetchBlockByteRange(). Got a checksum exception
> for "
> >             + src + " at " + block.getBlock() + ":" + e.getPos() + "
> from "
> >             + chosenNode;
> >         DFSClient.LOG.warn(msg);
> >         // we want to remember what we have tried
> >         addIntoCorruptedBlockMap(block.getBlock(), chosenNode,
> > corruptedBlockMap);
> >         addToDeadNodes(chosenNode);
> >         throw new IOException(msg);
> >       }
> >
>
> --
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to
> which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.
>

Reply via email to