Repository: hadoop Updated Branches: refs/heads/trunk 53959e69f -> 28bebc81d
HDFS-7999. FsDatasetImpl#createTemporary sometimes holds the FSDatasetImpl lock for a very long time (sinago via cmccabe) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/28bebc81 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/28bebc81 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/28bebc81 Branch: refs/heads/trunk Commit: 28bebc81db8bb6d1bc2574de7564fe4c595cfe09 Parents: 53959e6 Author: Colin Patrick Mccabe <[email protected]> Authored: Mon Apr 6 08:54:46 2015 -0700 Committer: Colin Patrick Mccabe <[email protected]> Committed: Mon Apr 6 08:56:52 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../datanode/fsdataset/impl/FsDatasetImpl.java | 77 +++++++++++++------- 2 files changed, 52 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/28bebc81/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6fafec8..52325a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1379,6 +1379,9 @@ Release 2.7.0 - UNRELEASED HDFS-8051. FsVolumeList#addVolume should release volume reference if not put it into BlockScanner. (Lei (Eddy) Xu via Colin P. McCabe) + HDFS-7999. FsDatasetImpl#createTemporary sometimes holds the FSDatasetImpl + lock for a very long time (sinago via cmccabe) + BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS HDFS-7720. Quota by Storage Type API, tools and ClientNameNode http://git-wip-us.apache.org/repos/asf/hadoop/blob/28bebc81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index f15f649..6bcbe5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1412,38 +1412,59 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } @Override // FsDatasetSpi - public synchronized ReplicaHandler createTemporary( + public ReplicaHandler createTemporary( StorageType storageType, ExtendedBlock b) throws IOException { - ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); - if (replicaInfo != null) { - if (replicaInfo.getGenerationStamp() < b.getGenerationStamp() - && replicaInfo instanceof ReplicaInPipeline) { - // Stop the previous writer - ((ReplicaInPipeline)replicaInfo) - .stopWriter(datanode.getDnConf().getXceiverStopTimeout()); - invalidate(b.getBlockPoolId(), new Block[]{replicaInfo}); - } else { - throw new ReplicaAlreadyExistsException("Block " + b + - " already exists in state " + replicaInfo.getState() + - " and thus cannot be created."); + long startTimeMs = Time.monotonicNow(); + long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout(); + ReplicaInfo lastFoundReplicaInfo = null; + do { + synchronized (this) { + ReplicaInfo currentReplicaInfo = + volumeMap.get(b.getBlockPoolId(), b.getBlockId()); + if (currentReplicaInfo == lastFoundReplicaInfo) { + if (lastFoundReplicaInfo != null) { + invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo }); + } + FsVolumeReference ref = + volumes.getNextVolume(storageType, b.getNumBytes()); + FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); + // create a temporary file to hold block in the designated volume + File f; + try { + f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); + } catch (IOException e) { + IOUtils.cleanup(null, ref); + throw e; + } + ReplicaInPipeline newReplicaInfo = + new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, + f.getParentFile(), 0); + volumeMap.add(b.getBlockPoolId(), newReplicaInfo); + return new ReplicaHandler(newReplicaInfo, ref); + } else { + if (!(currentReplicaInfo.getGenerationStamp() < b + .getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) { + throw new ReplicaAlreadyExistsException("Block " + b + + " already exists in state " + currentReplicaInfo.getState() + + " and thus cannot be created."); + } + lastFoundReplicaInfo = currentReplicaInfo; + } } - } - FsVolumeReference ref = volumes.getNextVolume(storageType, b.getNumBytes()); - FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); - // create a temporary file to hold block in the designated volume - File f; - try { - f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock()); - } catch (IOException e) { - IOUtils.cleanup(null, ref); - throw e; - } + // Hang too long, just bail out. This is not supposed to happen. + long writerStopMs = Time.monotonicNow() - startTimeMs; + if (writerStopMs > writerStopTimeoutMs) { + LOG.warn("Unable to stop existing writer for block " + b + " after " + + writerStopMs + " miniseconds."); + throw new IOException("Unable to stop existing writer for block " + b + + " after " + writerStopMs + " miniseconds."); + } - ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), - b.getGenerationStamp(), v, f.getParentFile(), 0); - volumeMap.add(b.getBlockPoolId(), newReplicaInfo); - return new ReplicaHandler(newReplicaInfo, ref); + // Stop the previous writer + ((ReplicaInPipeline) lastFoundReplicaInfo) + .stopWriter(writerStopTimeoutMs); + } while (true); } /**
