HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/be34e85e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/be34e85e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/be34e85e Branch: refs/heads/HDFS-7240 Commit: be34e85e682880f46eee0310bf00ecc7d39cd5bd Parents: c14c1b2 Author: Jing Zhao <ji...@apache.org> Authored: Tue Jun 7 10:48:21 2016 -0700 Committer: Jing Zhao <ji...@apache.org> Committed: Tue Jun 7 10:48:21 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSInputStream.java | 36 ++++++-- .../java/org/apache/hadoop/hdfs/TestRead.java | 87 ++++++++++++++++++++ .../server/datanode/SimulatedFSDataset.java | 4 +- 3 files changed, 119 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/be34e85e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 2ed0abd..7f32a56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; @@ -304,7 +306,7 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(waitTime); } catch (InterruptedException e) { - throw new IOException( + throw new InterruptedIOException( "Interrupted while getting the last block length."); } } @@ -379,6 +381,7 @@ public class DFSInputStream extends FSInputStream return n; } } catch (IOException ioe) { + checkInterrupted(ioe); if (ioe instanceof RemoteException) { if (((RemoteException) ioe).unwrapRemoteException() instanceof ReplicaNotFoundException) { @@ -414,7 +417,8 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(500); // delay between retries. } catch (InterruptedException e) { - throw new IOException("Interrupted while getting the length."); + throw new InterruptedIOException( + "Interrupted while getting the length."); } } @@ -660,6 +664,7 @@ public class DFSInputStream extends FSInputStream } return chosenNode; } catch (IOException ex) { + checkInterrupted(ex); if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + targetAddr @@ -681,6 +686,15 @@ public class DFSInputStream extends FSInputStream } } + private void checkInterrupted(IOException e) throws IOException { + if (Thread.currentThread().isInterrupted() && + (e instanceof ClosedByInterruptException || + e instanceof InterruptedIOException)) { + DFSClient.LOG.debug("The reading thread has been interrupted.", e); + throw e; + } + } + protected BlockReader getBlockReader(LocatedBlock targetBlock, long offsetInBlock, long length, InetSocketAddress targetAddr, StorageType storageType, DatanodeInfo datanode) throws IOException { @@ -948,6 +962,7 @@ public class DFSInputStream extends FSInputStream } catch (ChecksumException ce) { throw ce; } catch (IOException e) { + checkInterrupted(e); if (retries == 1) { DFSClient.LOG.warn("DFS Read", e); } @@ -1044,9 +1059,12 @@ public class DFSInputStream extends FSInputStream // expanding time window for each failure timeWindow * (failures + 1) * ThreadLocalRandom.current().nextDouble(); - DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); + DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + + " IOException, will wait for " + waitTime + " msec."); Thread.sleep((long)waitTime); - } catch (InterruptedException ignored) { + } catch (InterruptedException e) { + throw new InterruptedIOException( + "Interrupted while choosing DataNode for read."); } deadNodes.clear(); //2nd option is to remove only nodes[blockId] openInfo(true); @@ -1140,7 +1158,8 @@ public class DFSInputStream extends FSInputStream buf, offset, corruptedBlocks); return; } catch (IOException e) { - // Ignore. Already processed inside the function. + checkInterrupted(e); // check if the read has been interrupted + // Ignore other IOException. Already processed inside the function. // Loop through to try the next node. } } @@ -1218,6 +1237,7 @@ public class DFSInputStream extends FSInputStream addToDeadNodes(datanode.info); throw new IOException(msg); } catch (IOException e) { + checkInterrupted(e); if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + datanode.addr @@ -1306,8 +1326,11 @@ public class DFSInputStream extends FSInputStream ignored.add(chosenNode.info); dfsClient.getHedgedReadMetrics().incHedgedReadOps(); // continue; no need to refresh block locations - } catch (InterruptedException | ExecutionException e) { + } catch (ExecutionException e) { // Ignore + } catch (InterruptedException e) { + throw new InterruptedIOException( + "Interrupted while waiting for reading task"); } } else { // We are starting up a 'hedged' read. We have a read already @@ -1594,6 +1617,7 @@ public class DFSInputStream extends FSInputStream } catch (IOException e) {//make following read to retry DFSClient.LOG.debug("Exception while seek to {} from {} of {} from " + "{}", targetPos, getCurrentBlock(), src, currentNode, e); + checkInterrupted(e); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/be34e85e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java index 9d38fd7..974fdf8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRead.java @@ -19,9 +19,19 @@ package org.apache.hadoop.hdfs; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.io.IOUtils; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -103,4 +113,81 @@ public class TestRead { cluster.shutdown(); } } + + @Test(timeout=60000) + public void testInterruptReader() throws Exception { + final Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, + DelayedSimulatedFSDataset.Factory.class.getName()); + + final MiniDFSCluster cluster = new MiniDFSCluster + .Builder(conf).numDataNodes(1).build(); + final DistributedFileSystem fs = cluster.getFileSystem(); + try { + cluster.waitActive(); + final Path file = new Path("/foo"); + DFSTestUtil.createFile(fs, file, 1024, (short) 1, 0L); + + final FSDataInputStream in = fs.open(file); + AtomicBoolean readInterrupted = new AtomicBoolean(false); + final Thread reader = new Thread(new Runnable() { + @Override + public void run() { + try { + in.read(new byte[1024], 0, 1024); + } catch (IOException e) { + if (e instanceof ClosedByInterruptException || + e instanceof InterruptedIOException) { + readInterrupted.set(true); + } + } + } + }); + + reader.start(); + Thread.sleep(1000); + reader.interrupt(); + reader.join(); + + Assert.assertTrue(readInterrupted.get()); + } finally { + cluster.shutdown(); + } + } + + private static class DelayedSimulatedFSDataset extends SimulatedFSDataset { + private volatile boolean isDelayed = true; + + DelayedSimulatedFSDataset(DataNode datanode, DataStorage storage, + Configuration conf) { + super(datanode, storage, conf); + } + + @Override + public synchronized InputStream getBlockInputStream(ExtendedBlock b, + long seekOffset) throws IOException { + while (isDelayed) { + try { + this.wait(); + } catch (InterruptedException ignored) { + } + } + InputStream result = super.getBlockInputStream(b); + IOUtils.skipFully(result, seekOffset); + return result; + } + + static class Factory extends FsDatasetSpi.Factory<DelayedSimulatedFSDataset> { + @Override + public DelayedSimulatedFSDataset newInstance(DataNode datanode, + DataStorage storage, Configuration conf) throws IOException { + return new DelayedSimulatedFSDataset(datanode, storage, conf); + } + + @Override + public boolean isSimulated() { + return true; + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/be34e85e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 1fdedca..25034c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -960,8 +960,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { return new ReplicaHandler(binfo, null); } - synchronized InputStream getBlockInputStream(ExtendedBlock b - ) throws IOException { + protected synchronized InputStream getBlockInputStream(ExtendedBlock b) + throws IOException { final Map<Block, BInfo> map = getMap(b.getBlockPoolId()); BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org