This is an automated email from the ASF dual-hosted git repository. shv pushed a commit to branch fgl in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 0e7d216134cc0c93b4ed5861430ef79ce0a9c7ab Author: Konstantin V Shvachko <s...@apache.org> AuthorDate: Fri May 7 17:47:37 2021 -0700 INodeMap with PartitionedGSet and per-partition locking. --- .../java/org/apache/hadoop/util/LatchLock.java | 64 +++++ .../org/apache/hadoop/util/PartitionedGSet.java | 263 +++++++++++++++++++++ .../hadoop/hdfs/server/namenode/FSDirMkdirOp.java | 92 ++++++- .../hadoop/hdfs/server/namenode/FSDirectory.java | 2 +- .../hadoop/hdfs/server/namenode/FSImage.java | 29 ++- .../hadoop/hdfs/server/namenode/FSNamesystem.java | 9 +- .../hdfs/server/namenode/FSNamesystemLock.java | 96 +++++++- .../hadoop/hdfs/server/namenode/INodeMap.java | 148 ++++++++++-- .../java/org/apache/hadoop/hdfs/DFSTestUtil.java | 2 + .../hadoop/hdfs/server/namenode/TestINodeFile.java | 39 ++- 10 files changed, 682 insertions(+), 62 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java new file mode 100644 index 0000000..41e33da --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +/** + * LatchLock controls two hierarchical Read/Write locks: + * the topLock and the childLock. + * Typically an operation starts with the topLock already acquired. + * To acquire child lock LatchLock will + * first acquire the childLock, and then release the topLock. + */ +public abstract class LatchLock<C> { + // Interfaces methods to be defined for subclasses + /** @return true topLock is locked for read by any thread */ + protected abstract boolean isReadTopLocked(); + /** @return true topLock is locked for write by any thread */ + protected abstract boolean isWriteTopLocked(); + protected abstract void readTopdUnlock(); + protected abstract void writeTopUnlock(); + + protected abstract boolean hasReadChildLock(); + protected abstract void readChildLock(); + protected abstract void readChildUnlock(); + + protected abstract boolean hasWriteChildLock(); + protected abstract void writeChildLock(); + protected abstract void writeChildUnlock(); + + protected abstract LatchLock<C> clone(); + + // Public APIs to use with the class + public void readLock() { + readChildLock(); + readTopdUnlock(); + } + + public void readUnlock() { + readChildUnlock(); + } + + public void writeLock() { + writeChildLock(); + writeTopUnlock(); + } + + public void writeUnlock() { + writeChildUnlock(); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java new file mode 100644 index 0000000..4b0cdc9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.util.LightWeightGSet.LinkedElement; + +/** + * An implementation of {@link GSet}, which splits a collection of elements + * into partitions each corresponding to a range of keys. + * + * This class does not support null element. + * + * This class is backed up by LatchLock for hierarchical synchronization. + * + * @param <K> Key type for looking up the elements + * @param <E> Element type, which must be + * (1) a subclass of K, and + * (2) implementing {@link LinkedElement} interface. + */ +@InterfaceAudience.Private +public class PartitionedGSet<K, E extends K> implements GSet<K, E> { + + private static final int DEFAULT_PARTITION_CAPACITY = 2027; + + /** + * An ordered map of contiguous segments of elements. + * Each key in the map represent the smallest key in the mapped segment, + * so that all elements in this segment are >= the mapping key, + * but are smaller then the next key in the map. + * Elements within a partition do not need to be ordered. + */ + private final NavigableMap<K, PartitionEntry> partitions; + private LatchLock<?> latchLock; + + /** + * The number of elements in the set. + */ + protected volatile int size; + + /** + * A single partition of the {@link PartitionedGSet}. + * Consists of a hash table {@link LightWeightGSet} and a lock, which + * controls access to this partition independently on the other ones. + */ + private class PartitionEntry extends LightWeightGSet<K, E> { + private final LatchLock<?> partLock; + + PartitionEntry(int defaultPartitionCapacity) { + super(defaultPartitionCapacity); + this.partLock = latchLock.clone(); + } + } + + public PartitionedGSet(final int capacity, + final Comparator<? super K> comparator, + final LatchLock<?> latchLock, + final E rootKey) { + this.partitions = new TreeMap<K, PartitionEntry>(comparator); + this.latchLock = latchLock; + addNewPartition(rootKey).put(rootKey); + this.size = 1; + } + + /** + * Creates new empty partition. + * @param key + * @return + */ + private PartitionEntry addNewPartition(final K key) { + PartitionEntry lastPart = null; + if(size > 0) + lastPart = partitions.lastEntry().getValue(); + + PartitionEntry newPart = + new PartitionEntry(DEFAULT_PARTITION_CAPACITY); + // assert size == 0 || newPart.partLock.isWriteTopLocked() : + // "Must hold write Lock: key = " + key; + partitions.put(key, newPart); + + LOG.debug("Total GSet size = {}", size); + LOG.debug("Number of partitions = {}", partitions.size()); + LOG.debug("Previous partition size = {}", + lastPart == null ? 0 : lastPart.size()); + + return newPart; + } + + @Override + public int size() { + return size; + } + + protected PartitionEntry getPartition(final K key) { + Entry<K, PartitionEntry> partEntry = partitions.floorEntry(key); + if(partEntry == null) { + return null; + } + PartitionEntry part = partEntry.getValue(); + if(part == null) { + throw new IllegalStateException("Null partition for key: " + key); + } + assert size == 0 || part.partLock.isReadTopLocked() || + part.partLock.hasReadChildLock() : "Must hold read Lock: key = " + key; + return part; + } + + @Override + public boolean contains(final K key) { + PartitionEntry part = getPartition(key); + if(part == null) { + return false; + } + return part.contains(key); + } + + @Override + public E get(final K key) { + PartitionEntry part = getPartition(key); + if(part == null) { + return null; + } + LOG.debug("get key: {}", key); + // part.partLock.readLock(); + return part.get(key); + } + + @Override + public E put(final E element) { + K key = element; + PartitionEntry part = getPartition(key); + if(part == null) { + throw new HadoopIllegalArgumentException("Illegal key: " + key); + } + assert size == 0 || part.partLock.isWriteTopLocked() || + part.partLock.hasWriteChildLock() : + "Must hold write Lock: key = " + key; + LOG.debug("put key: {}", key); + PartitionEntry newPart = addNewPartitionIfNeeded(part, key); + if(newPart != part) { + newPart.partLock.writeChildLock(); + part = newPart; + } + E result = part.put(element); + if(result == null) { // new element + size++; + } + return result; + } + + private PartitionEntry addNewPartitionIfNeeded( + PartitionEntry curPart, K key) { + if(curPart.size() < DEFAULT_PARTITION_CAPACITY * 1.1 + || curPart.contains(key)) { + return curPart; + } + return addNewPartition(key); + } + + @Override + public E remove(final K key) { + PartitionEntry part = getPartition(key); + if(part == null) { + return null; + } + E result = part.remove(key); + if(result != null) { + size--; + } + return result; + } + + @Override + public void clear() { + LOG.error("Total GSet size = {}", size); + LOG.error("Number of partitions = {}", partitions.size()); + // assert latchLock.hasWriteTopLock() : "Must hold write topLock"; + // SHV May need to clear all partitions? + partitions.clear(); + size = 0; + } + + @Override + public Collection<E> values() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Iterator<E> iterator() { + return new EntryIterator(); + } + + /** + * Iterator over the elements in the set. + * Iterates first by keys, then inside the partition + * corresponding to the key. + * + * Modifications are tracked by the underlying collections. We allow + * modifying other partitions, while iterating through the current one. + */ + private class EntryIterator implements Iterator<E> { + private final Iterator<K> keyIterator; + private Iterator<E> partitionIterator; + + public EntryIterator() { + keyIterator = partitions.keySet().iterator(); + K curKey = partitions.firstKey(); + partitionIterator = getPartition(curKey).iterator(); + } + + @Override + public boolean hasNext() { + if(partitionIterator.hasNext()) { + return true; + } + return keyIterator.hasNext(); + } + + @Override + public E next() { + if(!partitionIterator.hasNext()) { + K curKey = keyIterator.next(); + partitionIterator = getPartition(curKey).iterator(); + } + return partitionIterator.next(); + } + } + + public void latchWriteLock(K[] keys) { + // getPartition(parent).partLock.writeChildLock(); + LatchLock<?> pLock = null; + for(K key : keys) { + pLock = getPartition(key).partLock; + pLock.writeChildLock(); + } + assert pLock != null : "pLock is null"; + pLock.writeTopUnlock(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java index da324fb..c8c6277 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirMkdirOp.java @@ -69,18 +69,18 @@ class FSDirMkdirOp { // create multiple inodes. fsn.checkFsObjectLimit(); - // Ensure that the user can traversal the path by adding implicit - // u+wx permission to all ancestor directories. - INodesInPath existing = - createParentDirectories(fsd, iip, permissions, false); - if (existing != null) { - existing = createSingleDirectory( - fsd, existing, iip.getLastLocalName(), permissions); + // create all missing directories along the path, + // but don't add them to the INodeMap yet + permissions = addImplicitUwx(permissions, permissions); // SHV !!! + INode[] missing = createPathDirectories(fsd, iip, permissions); + iip = iip.getExistingINodes(); + // switch the locks + fsd.getINodeMap().latchWriteLock(iip, missing); + // Add missing inodes to the INodeMap + for(INode dir : missing) { + iip = addSingleDirectory(fsd, iip, dir, permissions); + assert iip != null : "iip should not be null"; } - if (existing == null) { - throw new IOException("Failed to create directory: " + src); - } - iip = existing; } return fsd.getAuditFileInfo(iip); } finally { @@ -132,6 +132,7 @@ class FSDirMkdirOp { if (missing == 0) { // full path exists, return parents. existing = iip.getParentINodesInPath(); } else if (missing > 1) { // need to create at least one ancestor dir. + FSNamesystem.LOG.error("missing = " + missing); // Ensure that the user can traversal the path by adding implicit // u+wx permission to all ancestor directories. PermissionStatus basePerm = inheritPerms @@ -143,6 +144,13 @@ class FSDirMkdirOp { for (int i = existing.length(); existing != null && i <= last; i++) { byte[] component = iip.getPathComponent(i); existing = createSingleDirectory(fsd, existing, component, perm); + if(existing == null) { + FSNamesystem.LOG.error("unprotectedMkdir returned null for " + + iip.getPath() + " for " + new String(component) + " i = " + i); + // Somebody already created the parent. Recalculate existing + existing = INodesInPath.resolve(fsd.getRoot(), iip.getPathComponents()); + i = existing.length() - 1; + } } } return existing; @@ -228,5 +236,67 @@ class FSDirMkdirOp { } return iip; } + + private static INode createDirectoryINode(FSDirectory fsd, + INodesInPath parent, byte[] name, PermissionStatus permission) + throws FileAlreadyExistsException { + assert fsd.hasReadLock(); + assert parent.getLastINode() != null; + if (!parent.getLastINode().isDirectory()) { + throw new FileAlreadyExistsException("Parent path is not a directory: " + + parent.getPath() + " " + DFSUtil.bytes2String(name)); + } + final INodeDirectory dir = new INodeDirectory( + fsd.allocateNewInodeId(), name, permission, now()); + return dir; + } + + private static INode[] createPathDirectories(FSDirectory fsd, + INodesInPath iip, PermissionStatus perm) + throws IOException { + assert fsd.hasWriteLock(); + INodesInPath existing = iip.getExistingINodes(); + assert existing != null : "existing should not be null"; + int numMissing = iip.length() - existing.length(); + if (numMissing == 0) { // full path exists + return new INode[0]; + } + + // create the missing directories along the path + INode[] missing = new INode[numMissing]; + final int last = iip.length(); + for (int i = existing.length(); i < last; i++) { + byte[] component = iip.getPathComponent(i); + missing[i - existing.length()] = + createDirectoryINode(fsd, existing, component, perm); + } + return missing; + } + + private static INodesInPath addSingleDirectory(FSDirectory fsd, + INodesInPath existing, INode dir, PermissionStatus perm) + throws IOException { + assert fsd.hasWriteLock(); + INodesInPath iip = fsd.addLastINode(existing, dir, perm.getPermission(), true); + if (iip == null) { + FSNamesystem.LOG.debug("somebody already created {} on path {}", dir, existing.getPath()); + final INodeDirectory parent = existing.getLastINode().asDirectory(); + dir = parent.getChild(dir.getLocalNameBytes(), Snapshot.CURRENT_STATE_ID); + return INodesInPath.append(existing, dir, dir.getLocalNameBytes()); + } + existing = iip; + assert dir.equals(existing.getLastINode()) : "dir is not the last INode"; + + // Directory creation also count towards FilesCreated + // to match count of FilesDeleted metric. + NameNode.getNameNodeMetrics().incrFilesCreated(); + + assert dir.getPermissionStatus().getGroupName() != null : + "GroupName is null for " + existing.getPath(); + String cur = existing.getPath(); + fsd.getEditLog().logMkDir(cur, dir); + NameNode.stateChangeLog.debug("mkdirs: created directory {}", cur); + return existing; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 497aa84..b5e68a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -317,7 +317,7 @@ public class FSDirectory implements Closeable { FSDirectory(FSNamesystem ns, Configuration conf) throws IOException { this.inodeId = new INodeId(); rootDir = createRoot(ns); - inodeMap = INodeMap.newInstance(rootDir); + inodeMap = INodeMap.newInstance(rootDir, ns); this.isPermissionEnabled = conf.getBoolean( DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index f7749ce..1305438 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -177,18 +177,23 @@ public class FSImage implements Closeable { void format(FSNamesystem fsn, String clusterId, boolean force) throws IOException { - long fileCount = fsn.getFilesTotal(); - // Expect 1 file, which is the root inode - Preconditions.checkState(fileCount == 1, - "FSImage.format should be called with an uninitialized namesystem, has " + - fileCount + " files"); - NamespaceInfo ns = NNStorage.newNamespaceInfo(); - LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID()); - ns.clusterID = clusterId; - - storage.format(ns); - editLog.formatNonFileJournals(ns, force); - saveFSImageInAllDirs(fsn, 0); + fsn.readLock(); + try { + long fileCount = fsn.getFilesTotal(); + // Expect 1 file, which is the root inode + Preconditions.checkState(fileCount == 1, + "FSImage.format should be called with an uninitialized namesystem, has " + + fileCount + " files"); + NamespaceInfo ns = NNStorage.newNamespaceInfo(); + LOG.info("Allocated new BlockPoolId: " + ns.getBlockPoolID()); + ns.clusterID = clusterId; + + storage.format(ns); + editLog.formatNonFileJournals(ns, force); + saveFSImageInAllDirs(fsn, 0); + } finally { + fsn.readUnlock(); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 7ccaae9..a8fb490 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1753,7 +1753,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, public void readUnlock(String opName, Supplier<String> lockReportInfoSupplier) { - this.fsLock.readUnlock(opName, lockReportInfoSupplier); + this.fsLock.readUnlock(opName, lockReportInfoSupplier, true); } @Override @@ -1786,7 +1786,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, @Override public boolean hasWriteLock() { - return this.fsLock.isWriteLockedByCurrentThread(); + return this.fsLock.isWriteLockedByCurrentThread() || + fsLock.haswWriteChildLock(); } @Override public boolean hasReadLock() { @@ -1801,6 +1802,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return this.fsLock.getWriteHoldCount(); } + public FSNamesystemLock getFSLock() { + return this.fsLock; + } + /** Lock the checkpoint lock */ public void cpLock() { this.cpLock.lock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java index b4f479f..f53f10d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs.server.namenode; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -29,6 +32,7 @@ import java.util.function.Supplier; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.namenode.INodeMap.INodeMapLock; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.log.LogThrottlingHelper; import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation; @@ -129,6 +133,32 @@ class FSNamesystemLock { private static final String OVERALL_METRIC_NAME = "Overall"; + private final ThreadLocal<Collection<INodeMapLock>> partitionLocks = + new ThreadLocal<Collection<INodeMapLock>>() { + @Override + public Collection<INodeMapLock> initialValue() { + return new ArrayList<INodeMapLock>(); + } + }; + + void addChildLock(INodeMapLock lock) { + partitionLocks.get().add(lock); + } + + boolean removeChildLock(INodeMapLock lock) { + return partitionLocks.get().remove(lock); + } + + boolean haswWriteChildLock() { + Iterator<INodeMapLock> iter = partitionLocks.get().iterator(); + // FSNamesystem.LOG.debug("partitionLocks.size = {}", partitionLocks.get().size()); + while(iter.hasNext()) { + if(iter.next().hasWriteChildLock()) + return true; + } + return false; + } + FSNamesystemLock(Configuration conf, MutableRatesWithAggregation detailedHoldTimeMetrics) { this(conf, detailedHoldTimeMetrics, new Timer()); @@ -180,11 +210,29 @@ class FSNamesystemLock { public void readUnlock(String opName, Supplier<String> lockReportInfoSupplier) { + readUnlock(opName, lockReportInfoSupplier, true); + } + + public void readUnlock(String opName, + Supplier<String> lockReportInfoSupplier, + boolean unlockChildren) { final boolean needReport = coarseLock.getReadHoldCount() == 1; final long readLockIntervalNanos = timer.monotonicNowNanos() - readLockHeldTimeStampNanos.get(); final long currentTimeMs = timer.now(); - coarseLock.readLock().unlock(); + + if(getReadHoldCount() > 0) { // Current thread holds the lock + // Unlock the top FSNamesystemLock + coarseLock.readLock().unlock(); + } + + if(unlockChildren) { // Also unlock and remove children locks + Iterator<INodeMapLock> iter = partitionLocks.get().iterator(); + while(iter.hasNext()) { + iter.next().readChildUnlock(); + iter.remove(); + } + } if (needReport) { addMetric(opName, readLockIntervalNanos, false); @@ -252,7 +300,7 @@ class FSNamesystemLock { * FSNamesystemLock#writeUnlock(String, boolean, Supplier)} */ public void writeUnlock() { - writeUnlock(OP_NAME_OTHER, false, null); + writeUnlock(OP_NAME_OTHER, false, null, true); } /** @@ -262,7 +310,7 @@ class FSNamesystemLock { * @param opName Operation name. */ public void writeUnlock(String opName) { - writeUnlock(opName, false, null); + writeUnlock(opName, false, null, true); } /** @@ -274,7 +322,7 @@ class FSNamesystemLock { */ public void writeUnlock(String opName, Supplier<String> lockReportInfoSupplier) { - writeUnlock(opName, false, lockReportInfoSupplier); + writeUnlock(opName, false, lockReportInfoSupplier, true); } /** @@ -286,7 +334,7 @@ class FSNamesystemLock { * for long time will be logged in logs and metrics. */ public void writeUnlock(String opName, boolean suppressWriteLockReport) { - writeUnlock(opName, suppressWriteLockReport, null); + writeUnlock(opName, suppressWriteLockReport, null, true); } /** @@ -297,8 +345,9 @@ class FSNamesystemLock { * for long time will be logged in logs and metrics. * @param lockReportInfoSupplier The info shown in the lock report */ - private void writeUnlock(String opName, boolean suppressWriteLockReport, - Supplier<String> lockReportInfoSupplier) { + public void writeUnlock(String opName, boolean suppressWriteLockReport, + Supplier<String> lockReportInfoSupplier, + boolean unlockChildren) { final boolean needReport = !suppressWriteLockReport && coarseLock .getWriteHoldCount() == 1 && coarseLock.isWriteLockedByCurrentThread(); final long writeLockIntervalNanos = @@ -329,7 +378,18 @@ class FSNamesystemLock { longestWriteLockHeldInfo = new LockHeldInfo(); } - coarseLock.writeLock().unlock(); + if(this.isWriteLockedByCurrentThread()) { // Current thread holds the lock + // Unlock the top FSNamesystemLock + coarseLock.writeLock().unlock(); + } + + if(unlockChildren) { // Unlock and remove children locks + Iterator<INodeMapLock> iter = partitionLocks.get().iterator(); + while(iter.hasNext()) { + iter.next().writeChildUnlock(); + iter.remove(); + } + } if (needReport) { addMetric(opName, writeLockIntervalNanos, true); @@ -355,7 +415,25 @@ class FSNamesystemLock { public int getWriteHoldCount() { return coarseLock.getWriteHoldCount(); } - + + /** + * Queries if the write lock is held by any thread. + * @return {@code true} if any thread holds the write lock and + * {@code false} otherwise + */ + public boolean isReadLocked() { + return coarseLock.getReadLockCount() > 0 || isWriteLocked(); + } + + /** + * Queries if the write lock is held by any thread. + * @return {@code true} if any thread holds the write lock and + * {@code false} otherwise + */ + public boolean isWriteLocked() { + return coarseLock.isWriteLocked(); + } + public boolean isWriteLockedByCurrentThread() { return coarseLock.isWriteLockedByCurrentThread(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java index f35949f..88c3233 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java @@ -17,44 +17,139 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.util.Comparator; import java.util.Iterator; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.util.GSet; +import org.apache.hadoop.util.LatchLock; import org.apache.hadoop.util.LightWeightGSet; - -import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.util.PartitionedGSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Storing all the {@link INode}s and maintaining the mapping between INode ID * and INode. */ public class INodeMap { - - static INodeMap newInstance(INodeDirectory rootDir) { - // Compute the map capacity by allocating 1% of total memory - int capacity = LightWeightGSet.computeCapacity(1, "INodeMap"); - GSet<INode, INodeWithAdditionalFields> map = - new LightWeightGSet<>(capacity); - map.put(rootDir); - return new INodeMap(map); + public static class INodeIdComparator implements Comparator<INode> { + @Override + public int compare(INode i1, INode i2) { + if (i1 == null || i2 == null) { + throw new NullPointerException("Cannot compare null INodesl"); + } + long id1 = i1.getId(); + long id2 = i2.getId(); + return id1 < id2 ? -1 : id1 == id2 ? 0 : 1; + } + } + + public class INodeMapLock extends LatchLock<ReentrantReadWriteLock> { + Logger LOG = LoggerFactory.getLogger(INodeMapLock.class); + + private ReentrantReadWriteLock childLock; + + INodeMapLock() { + this(null); + } + + private INodeMapLock(ReentrantReadWriteLock childLock) { + assert namesystem != null : "namesystem is null"; + this.childLock = childLock; + } + + @Override + protected boolean isReadTopLocked() { + return namesystem.getFSLock().isReadLocked(); + } + + @Override + protected boolean isWriteTopLocked() { + return namesystem.getFSLock().isWriteLocked(); + } + + @Override + protected void readTopdUnlock() { + namesystem.getFSLock().readUnlock("INodeMap", null, false); + } + + @Override + protected void writeTopUnlock() { + namesystem.getFSLock().writeUnlock("INodeMap", false, null, false); + } + + @Override + protected boolean hasReadChildLock() { + return this.childLock.getReadHoldCount() > 0 || hasWriteChildLock(); + } + + @Override + protected void readChildLock() { + // LOG.info("readChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName()); + this.childLock.readLock().lock(); + namesystem.getFSLock().addChildLock(this); + // LOG.info("readChildLock: done"); + } + + @Override + protected void readChildUnlock() { + // LOG.info("readChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName()); + this.childLock.readLock().unlock(); + // LOG.info("readChildUnlock: done"); + } + + @Override + protected boolean hasWriteChildLock() { + return this.childLock.isWriteLockedByCurrentThread(); + } + + @Override + protected void writeChildLock() { + // LOG.info("writeChildLock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName()); + this.childLock.writeLock().lock(); + namesystem.getFSLock().addChildLock(this); + // LOG.info("writeChildLock: done"); + } + + @Override + protected void writeChildUnlock() { + // LOG.info("writeChildUnlock: thread = {}, {}", Thread.currentThread().getId(), Thread.currentThread().getName()); + this.childLock.writeLock().unlock(); + // LOG.info("writeChildUnlock: done"); + } + + @Override + protected LatchLock<ReentrantReadWriteLock> clone() { + return new INodeMapLock(new ReentrantReadWriteLock(false)); // not fair + } + } + + static INodeMap newInstance(INodeDirectory rootDir, + FSNamesystem ns) { + return new INodeMap(rootDir, ns); } /** Synchronized by external lock. */ private final GSet<INode, INodeWithAdditionalFields> map; - + private FSNamesystem namesystem; + public Iterator<INodeWithAdditionalFields> getMapIterator() { return map.iterator(); } - private INodeMap(GSet<INode, INodeWithAdditionalFields> map) { - Preconditions.checkArgument(map != null); - this.map = map; + private INodeMap(INodeDirectory rootDir, FSNamesystem ns) { + this.namesystem = ns; + // Compute the map capacity by allocating 1% of total memory + int capacity = LightWeightGSet.computeCapacity(1, "INodeMap"); + this.map = new PartitionedGSet<>(capacity, new INodeIdComparator(), + new INodeMapLock(), rootDir); } - + /** * Add an {@link INode} into the {@link INode} map. Replace the old value if * necessary. @@ -138,4 +233,27 @@ public class INodeMap { public void clear() { map.clear(); } + + public void latchWriteLock(INodesInPath iip, INode[] missing) { + assert namesystem.hasReadLock() : "must have namesysem lock"; + assert iip.length() > 0 : "INodesInPath has 0 length"; + if(!(map instanceof PartitionedGSet)) { + return; + } + // Locks partitions along the path starting from the first existing parent + // Locking is in the hierarchical order + INode[] allINodes = new INode[Math.min(1, iip.length()) + missing.length]; + allINodes[0] = iip.getLastINode(); + System.arraycopy(missing, 0, allINodes, 1, missing.length); + /* + // Locks all the partitions along the path in the hierarchical order + INode[] allINodes = new INode[iip.length() + missing.length]; + INode[] existing = iip.getINodesArray(); + System.arraycopy(existing, 0, allINodes, 0, existing.length); + System.arraycopy(missing, 0, allINodes, existing.length, missing.length); + */ + + ((PartitionedGSet<INode, INodeWithAdditionalFields>) + map).latchWriteLock(allINodes); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index d813375..4982119 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -186,6 +186,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.Whitebox; +import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -1997,6 +1998,7 @@ public class DFSTestUtil { GenericTestUtils.setLogLevel(NameNode.LOG, level); GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level); GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level); + GenericTestUtils.setLogLevel(GSet.LOG, level); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java index b32f8fe..0b21032 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java @@ -1017,23 +1017,38 @@ public class TestINodeFile { final Path dir = new Path("/dir"); hdfs.mkdirs(dir); - INodeDirectory dirNode = getDir(fsdir, dir); - INode dirNodeFromNode = fsdir.getInode(dirNode.getId()); - assertSame(dirNode, dirNodeFromNode); + cluster.getNamesystem().readLock(); + try { + INodeDirectory dirNode = getDir(fsdir, dir); + INode dirNodeFromNode = fsdir.getInode(dirNode.getId()); + assertSame(dirNode, dirNodeFromNode); + } finally { + cluster.getNamesystem().readUnlock(); + } // set quota to dir, which leads to node replacement hdfs.setQuota(dir, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1); - dirNode = getDir(fsdir, dir); - assertTrue(dirNode.isWithQuota()); - // the inode in inodeMap should also be replaced - dirNodeFromNode = fsdir.getInode(dirNode.getId()); - assertSame(dirNode, dirNodeFromNode); + cluster.getNamesystem().readLock(); + try { + INodeDirectory dirNode = getDir(fsdir, dir); + assertTrue(dirNode.isWithQuota()); + // the inode in inodeMap should also be replaced + INode dirNodeFromNode = fsdir.getInode(dirNode.getId()); + assertSame(dirNode, dirNodeFromNode); + } finally { + cluster.getNamesystem().readUnlock(); + } hdfs.setQuota(dir, -1, -1); - dirNode = getDir(fsdir, dir); - // the inode in inodeMap should also be replaced - dirNodeFromNode = fsdir.getInode(dirNode.getId()); - assertSame(dirNode, dirNodeFromNode); + cluster.getNamesystem().readLock(); + try { + INodeDirectory dirNode = getDir(fsdir, dir); + // the inode in inodeMap should also be replaced + INode dirNodeFromNode = fsdir.getInode(dirNode.getId()); + assertSame(dirNode, dirNodeFromNode); + } finally { + cluster.getNamesystem().readUnlock(); + } } finally { if (cluster != null) { cluster.shutdown(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org