I use hadoop2.4.
When I use "hedged read", If there is only one live datanode, the reading
from the datanode throw TimeoutException and ChecksumException., the
Client will infinite wait.
Example below test case:
@Test
public void testException() throws IOException, InterruptedException,
ExecutionException {
Configuration conf = new Configuration();
int numHedgedReadPoolThreads = 5;
final int hedgedReadTimeoutMillis = 50;
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
numHedgedReadPoolThreads);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
hedgedReadTimeoutMillis);
// Set up the InjectionHandler
DFSClientFaultInjector.instance =
Mockito.mock(DFSClientFaultInjector.class);
DFSClientFaultInjector injector = DFSClientFaultInjector.instance;
// make preads ChecksumException
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
if(true) {
Thread.sleep(hedgedReadTimeoutMillis + 10);
throw new ChecksumException("test", 100);
}
return null;
}
}*).when(injector).fetchFromDatanodeException();*
MiniDFSCluster cluster = new
MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).build();
DistributedFileSystem fileSys = cluster.getFileSystem();
DFSClient dfsClient = fileSys.getClient();
DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
try {
Path file = new Path("/hedgedReadException.dat");
FSDataOutputStream output = fileSys.create(file,(short)1);
byte[] data = new byte[64 * 1024];
output.write(data);
output.flush();
output.write(data);
output.flush();
output.write(data);
output.flush();
output.close();
byte[] buffer = new byte[64 * 1024];
FSDataInputStream input = fileSys.open(file);
input.read(0, buffer, 0, 1024);
input.close();
assertTrue(metrics.getHedgedReadOps() == 1);
assertTrue(metrics.getHedgedReadWins() == 1);
} finally {
fileSys.close();
cluster.shutdown();
Mockito.reset(injector);
}
}
*The code of actualGetFromOneDataNode() method call
**fetchFromDatanodeException()
method as below:*
try {
*DFSClientFaultInjector.get().fetchFromDatanodeException();*
Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
int len = (int) (end - start + 1);
reader = new BlockReaderFactory(dfsClient.getConf()).
setInetSocketAddress(targetAddr).
setRemotePeerFactory(dfsClient).
setDatanodeInfo(chosenNode).
setFileName(src).
setBlock(block.getBlock()).
setBlockToken(blockToken).
setStartOffset(start).
setVerifyChecksum(verifyChecksum).
setClientName(dfsClient.clientName).
setLength(len).
setCachingStrategy(curCachingStrategy).
setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
setClientCacheContext(dfsClient.getClientContext()).
setUserGroupInformation(dfsClient.ugi).
setConfiguration(dfsClient.getConfiguration()).
build();
int nread = reader.readAll(buf, offset, len);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + len + ", got " + nread);
}
return;
} catch (ChecksumException e) {
String msg = "fetchBlockByteRange(). Got a checksum exception for "
+ src + " at " + block.getBlock() + ":" + e.getPos() + " from "
+ chosenNode;
DFSClient.LOG.warn(msg);
// we want to remember what we have tried
addIntoCorruptedBlockMap(block.getBlock(), chosenNode,
corruptedBlockMap);
addToDeadNodes(chosenNode);
throw new IOException(msg);
}