I use hadoop2.4.

When I use "hedged read", If there is only one live datanode, the reading
from  the datanode throw TimeoutException and ChecksumException., the
Client will infinite wait.

Example below test case:
  @Test
  public void testException() throws IOException, InterruptedException,
ExecutionException {
    Configuration conf = new Configuration();
    int numHedgedReadPoolThreads = 5;
    final int hedgedReadTimeoutMillis = 50;
    conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
numHedgedReadPoolThreads);
    conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
      hedgedReadTimeoutMillis);
    // Set up the InjectionHandler
    DFSClientFaultInjector.instance =
Mockito.mock(DFSClientFaultInjector.class);
    DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
    // make preads ChecksumException
    Mockito.doAnswer(new Answer<Void>() {
      @Override
      public Void answer(InvocationOnMock invocation) throws Throwable {
        if(true) {
          Thread.sleep(hedgedReadTimeoutMillis + 10);
          throw new ChecksumException("test", 100);
        }
        return null;
      }
    }*).when(injector).fetchFromDatanodeException();*

    MiniDFSCluster cluster = new
MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
    DistributedFileSystem fileSys = cluster.getFileSystem();
    DFSClient dfsClient = fileSys.getClient();
    DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();

    try {
      Path file = new Path("/hedgedReadException.dat");
      FSDataOutputStream  output = fileSys.create(file,(short)1);
      byte[] data = new byte[64 * 1024];
      output.write(data);
      output.flush();
      output.write(data);
      output.flush();
      output.write(data);
      output.flush();
      output.close();
      byte[] buffer = new byte[64 * 1024];
      FSDataInputStream  input = fileSys.open(file);
      input.read(0, buffer, 0, 1024);
      input.close();
      assertTrue(metrics.getHedgedReadOps() == 1);
      assertTrue(metrics.getHedgedReadWins() == 1);
    } finally {
      fileSys.close();
      cluster.shutdown();
      Mockito.reset(injector);
    }
  }


*The code of actualGetFromOneDataNode() method call
**fetchFromDatanodeException()
method as below:*
      try {
        *DFSClientFaultInjector.get().fetchFromDatanodeException();*
        Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
        int len = (int) (end - start + 1);
        reader = new BlockReaderFactory(dfsClient.getConf()).
            setInetSocketAddress(targetAddr).
            setRemotePeerFactory(dfsClient).
            setDatanodeInfo(chosenNode).
            setFileName(src).
            setBlock(block.getBlock()).
            setBlockToken(blockToken).
            setStartOffset(start).
            setVerifyChecksum(verifyChecksum).
            setClientName(dfsClient.clientName).
            setLength(len).
            setCachingStrategy(curCachingStrategy).
            setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
            setClientCacheContext(dfsClient.getClientContext()).
            setUserGroupInformation(dfsClient.ugi).
            setConfiguration(dfsClient.getConfiguration()).
            build();
        int nread = reader.readAll(buf, offset, len);
        if (nread != len) {
          throw new IOException("truncated return from reader.read(): " +
                                "excpected " + len + ", got " + nread);
        }
        return;
      } catch (ChecksumException e) {
        String msg = "fetchBlockByteRange(). Got a checksum exception for "
            + src + " at " + block.getBlock() + ":" + e.getPos() + " from "
            + chosenNode;
        DFSClient.LOG.warn(msg);
        // we want to remember what we have tried
        addIntoCorruptedBlockMap(block.getBlock(), chosenNode,
corruptedBlockMap);
        addToDeadNodes(chosenNode);
        throw new IOException(msg);
      }

Reply via email to