This is an automated email from the ASF dual-hosted git repository. zanderxu pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 16c25b277da7c63205975671ba09f0f1a2a664be Author: ZanderXu <zande...@apache.org> AuthorDate: Tue Mar 5 22:10:59 2024 +0800 HDFS-17398. [FGL] Implement the FGL lock for FSNLockManager (#6599) --- .../namenode/fgl/FineGrainedFSNamesystemLock.java | 300 +++++++++++++++++++++ .../fgl/TestFineGrainedFSNamesystemLock.java | 275 +++++++++++++++++++ 2 files changed, 575 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/fgl/FineGrainedFSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/fgl/FineGrainedFSNamesystemLock.java new file mode 100644 index 00000000000..c2e3980476a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/fgl/FineGrainedFSNamesystemLock.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.fgl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystemLock; +import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation; + +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +/** + * Splitting the global FSN lock into FSLock and BMLock. + * FSLock is used to protect directory tree-related operations. + * BMLock is used to protect block-related and dn-related operations. + * The lock order should be: FSLock,BMLock. + */ +public class FineGrainedFSNamesystemLock implements FSNLockManager { + private final FSNamesystemLock fsLock; + private final FSNamesystemLock bmLock; + + public FineGrainedFSNamesystemLock(Configuration conf, MutableRatesWithAggregation aggregation) { + this.fsLock = new FSNamesystemLock(conf, aggregation); + this.bmLock = new FSNamesystemLock(conf, aggregation); + } + + @Override + public void readLock(FSNamesystemLockMode lockMode) { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + this.fsLock.readLock(); + this.bmLock.readLock(); + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + this.fsLock.readLock(); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + this.bmLock.readLock(); + } + } + + public void readLockInterruptibly(FSNamesystemLockMode lockMode) throws InterruptedException { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + this.fsLock.readLockInterruptibly(); + try { + this.bmLock.readLockInterruptibly(); + } catch (InterruptedException e) { + // The held FSLock should be released if the current thread is interrupted + // while acquiring the BMLock. + this.fsLock.readUnlock("BMReadLockInterruptiblyFailed"); + throw e; + } + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + this.fsLock.readLockInterruptibly(); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + this.bmLock.readLockInterruptibly(); + } + } + + @Override + public void readUnlock(FSNamesystemLockMode lockMode, String opName) { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + this.bmLock.readUnlock(opName); + this.fsLock.readUnlock(opName); + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + this.fsLock.readUnlock(opName); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + this.bmLock.readUnlock(opName); + } + } + + public void readUnlock(FSNamesystemLockMode lockMode, String opName, + Supplier<String> lockReportInfoSupplier) { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + this.bmLock.readUnlock(opName, lockReportInfoSupplier); + this.fsLock.readUnlock(opName, lockReportInfoSupplier); + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + this.fsLock.readUnlock(opName, lockReportInfoSupplier); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + this.bmLock.readUnlock(opName, lockReportInfoSupplier); + } + } + + @Override + public void writeLock(FSNamesystemLockMode lockMode) { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + this.fsLock.writeLock(); + this.bmLock.writeLock(); + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + this.fsLock.writeLock(); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + this.bmLock.writeLock(); + } + } + + @Override + public void writeUnlock(FSNamesystemLockMode lockMode, String opName) { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + this.bmLock.writeUnlock(opName); + this.fsLock.writeUnlock(opName); + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + this.fsLock.writeUnlock(opName); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + this.bmLock.writeUnlock(opName); + } + } + + @Override + public void writeUnlock(FSNamesystemLockMode lockMode, String opName, + boolean suppressWriteLockReport) { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + this.bmLock.writeUnlock(opName, suppressWriteLockReport); + this.fsLock.writeUnlock(opName, suppressWriteLockReport); + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + this.fsLock.writeUnlock(opName, suppressWriteLockReport); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + this.bmLock.writeUnlock(opName, suppressWriteLockReport); + } + } + + public void writeUnlock(FSNamesystemLockMode lockMode, String opName, + Supplier<String> lockReportInfoSupplier) { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + this.bmLock.writeUnlock(opName, lockReportInfoSupplier); + this.fsLock.writeUnlock(opName, lockReportInfoSupplier); + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + this.fsLock.writeUnlock(opName, lockReportInfoSupplier); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + this.bmLock.writeUnlock(opName, lockReportInfoSupplier); + } + } + + @Override + public void writeLockInterruptibly(FSNamesystemLockMode lockMode) + throws InterruptedException { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + this.fsLock.writeLockInterruptibly(); + try { + this.bmLock.writeLockInterruptibly(); + } catch (InterruptedException e) { + // The held FSLock should be released if the current thread is interrupted + // while acquiring the BMLock. + this.fsLock.writeUnlock("BMWriteLockInterruptiblyFailed"); + throw e; + } + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + this.fsLock.writeLockInterruptibly(); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + this.bmLock.writeLockInterruptibly(); + } + } + + @Override + public boolean hasWriteLock(FSNamesystemLockMode lockMode) { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + if (this.fsLock.isWriteLockedByCurrentThread()) { + // The bm writeLock should be held by the current thread. + assert this.bmLock.isWriteLockedByCurrentThread(); + return true; + } else { + // The bm writeLock should not be held by the current thread. + assert !this.bmLock.isWriteLockedByCurrentThread(); + return false; + } + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + return this.fsLock.isWriteLockedByCurrentThread(); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + return this.bmLock.isWriteLockedByCurrentThread(); + } + return false; + } + + @Override + public boolean hasReadLock(FSNamesystemLockMode lockMode) { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + if (hasWriteLock(FSNamesystemLockMode.GLOBAL)) { + return true; + } else if (this.fsLock.getReadHoldCount() > 0) { + // The bm readLock should be held by the current thread. + assert this.bmLock.getReadHoldCount() > 0; + return true; + } else { + // The bm readLock should not be held by the current thread. + assert this.bmLock.getReadHoldCount() <= 0; + return false; + } + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + return this.fsLock.getReadHoldCount() > 0 || this.fsLock.isWriteLockedByCurrentThread(); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + return this.bmLock.getReadHoldCount() > 0 || this.bmLock.isWriteLockedByCurrentThread(); + } + return false; + } + + @Override + /** + * This method is only used for ComputeDirectoryContentSummary. + * For the GLOBAL mode, just return the FSLock's ReadHoldCount. + */ + public int getReadHoldCount(FSNamesystemLockMode lockMode) { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + return this.fsLock.getReadHoldCount(); + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + return this.fsLock.getReadHoldCount(); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + return this.bmLock.getReadHoldCount(); + } + return -1; + } + + @Override + public int getQueueLength(FSNamesystemLockMode lockMode) { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + return -1; + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + return this.fsLock.getQueueLength(); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + return this.bmLock.getQueueLength(); + } + return -1; + } + + @Override + public long getNumOfReadLockLongHold(FSNamesystemLockMode lockMode) { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + return -1; + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + return this.fsLock.getNumOfReadLockLongHold(); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + return this.bmLock.getNumOfReadLockLongHold(); + } + return -1; + } + + @Override + public long getNumOfWriteLockLongHold(FSNamesystemLockMode lockMode) { + if (lockMode.equals(FSNamesystemLockMode.GLOBAL)) { + return -1; + } else if (lockMode.equals(FSNamesystemLockMode.FS)) { + return this.fsLock.getNumOfWriteLockLongHold(); + } else if (lockMode.equals(FSNamesystemLockMode.BM)) { + return this.bmLock.getNumOfWriteLockLongHold(); + } + return -1; + } + + @Override + public boolean isMetricsEnabled() { + return this.fsLock.isMetricsEnabled(); + } + + public void setMetricsEnabled(boolean metricsEnabled) { + this.fsLock.setMetricsEnabled(metricsEnabled); + this.bmLock.setMetricsEnabled(metricsEnabled); + } + + @Override + public void setReadLockReportingThresholdMs(long readLockReportingThresholdMs) { + this.fsLock.setReadLockReportingThresholdMs(readLockReportingThresholdMs); + this.bmLock.setReadLockReportingThresholdMs(readLockReportingThresholdMs); + } + + @Override + public long getReadLockReportingThresholdMs() { + return this.fsLock.getReadLockReportingThresholdMs(); + } + + @Override + public void setWriteLockReportingThresholdMs(long writeLockReportingThresholdMs) { + this.fsLock.setWriteLockReportingThresholdMs(writeLockReportingThresholdMs); + this.bmLock.setWriteLockReportingThresholdMs(writeLockReportingThresholdMs); + } + + @Override + public long getWriteLockReportingThresholdMs() { + return this.fsLock.getWriteLockReportingThresholdMs(); + } + + @Override + public void setLockForTests(ReentrantReadWriteLock lock) { + throw new UnsupportedOperationException("SetLockTests is unsupported"); + } + + @Override + public ReentrantReadWriteLock getLockForTests() { + throw new UnsupportedOperationException("SetLockTests is unsupported"); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/fgl/TestFineGrainedFSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/fgl/TestFineGrainedFSNamesystemLock.java new file mode 100644 index 00000000000..88d66cb3e9b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/fgl/TestFineGrainedFSNamesystemLock.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.fgl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; + +public class TestFineGrainedFSNamesystemLock { + + private final Logger log = LoggerFactory.getLogger(TestFineGrainedFSNamesystemLock.class); + + private int getLoopNumber() { + return ThreadLocalRandom.current().nextInt(2000, 3000); + } + + /** + * Test read/write lock of Global, FS and BM model through multi-threading. + */ + @Test(timeout=120000) + public void testMultipleThreadsUsingLocks() + throws InterruptedException, ExecutionException { + FineGrainedFSNamesystemLock fsn = new FineGrainedFSNamesystemLock(new Configuration(), null); + ExecutorService service = HadoopExecutors.newFixedThreadPool(1000); + + AtomicLong globalCount = new AtomicLong(0); + AtomicLong fsCount = new AtomicLong(0); + AtomicLong bmCount = new AtomicLong(0); + AtomicLong globalNumber = new AtomicLong(0); + AtomicLong fsNumber = new AtomicLong(0); + AtomicLong bmNumber = new AtomicLong(0); + + List<Callable<Boolean>> callableList = new ArrayList<>(1000); + for (int i = 0; i < 1000; i++) { + int index = i % 12; + String opName = Integer.toString(i); + if (index == 0) { // Test the global write lock via multiple threads. + callableList.add(() -> { + for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) { + writeLock(fsn, FSNamesystemLockMode.GLOBAL, opName, globalCount); + globalNumber.incrementAndGet(); + } + return true; + }); + } else if (index == 1) { // Test the fs write lock via multiple threads. + callableList.add(() -> { + for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) { + writeLock(fsn, FSNamesystemLockMode.FS, opName, fsCount); + fsNumber.incrementAndGet(); + } + return true; + }); + } else if (index == 2) { // Test the bm write lock via multiple threads. + callableList.add(() -> { + for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) { + writeLock(fsn, FSNamesystemLockMode.BM, opName, bmCount); + bmNumber.incrementAndGet(); + } + return true; + }); + } else if (index == 3) { // Test the bm read lock via multiple threads. + callableList.add(() -> { + for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) { + readLock(fsn, FSNamesystemLockMode.BM, opName, bmCount); + bmNumber.incrementAndGet(); + } + return true; + }); + } else if (index == 4) { // Test the fs read lock via multiple threads. + callableList.add(() -> { + for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) { + readLock(fsn, FSNamesystemLockMode.FS, opName, fsCount); + fsNumber.incrementAndGet(); + } + return true; + }); + } else if (index == 5) { // Test the global read lock via multiple threads. + callableList.add(() -> { + for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) { + readLock(fsn, FSNamesystemLockMode.GLOBAL, opName, globalCount); + globalNumber.incrementAndGet(); + } + return true; + }); + } else if (index == 6) { // Test the global interruptable write lock via multiple threads. + callableList.add(() -> { + for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) { + writeLockInterruptibly(fsn, FSNamesystemLockMode.GLOBAL, opName, globalCount); + globalNumber.incrementAndGet(); + } + return true; + }); + } else if (index == 7) { // Test the fs interruptable write lock via multiple threads. + callableList.add(() -> { + for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) { + writeLockInterruptibly(fsn, FSNamesystemLockMode.FS, opName, fsCount); + fsNumber.incrementAndGet(); + } + return true; + }); + } else if (index == 8) { // Test the bm interruptable write lock via multiple threads. + callableList.add(() -> { + for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) { + writeLockInterruptibly(fsn, FSNamesystemLockMode.BM, opName, bmCount); + bmNumber.incrementAndGet(); + } + return true; + }); + } else if (index == 9) { // Test the bm interruptable read lock via multiple threads. + callableList.add(() -> { + for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) { + readLockInterruptibly(fsn, FSNamesystemLockMode.BM, opName, bmCount); + bmNumber.incrementAndGet(); + } + return true; + }); + } else if (index == 10) { // Test the fs interruptable read lock via multiple threads. + callableList.add(() -> { + for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) { + readLockInterruptibly(fsn, FSNamesystemLockMode.FS, opName, fsCount); + fsNumber.incrementAndGet(); + } + return true; + }); + } else { // Test the global interruptable read lock via multiple threads. + callableList.add(() -> { + for (int startIndex = 0; startIndex < getLoopNumber(); startIndex++) { + readLockInterruptibly(fsn, FSNamesystemLockMode.GLOBAL, opName, globalCount); + globalNumber.incrementAndGet(); + } + return true; + }); + } + } + + List<Future<Boolean>> futures = service.invokeAll(callableList); + for (Future<Boolean> f : futures) { + f.get(); + } + log.info("Global executed {} times, FS executed {} times, BM executed {} times.", + globalNumber.get(), fsNumber.get(), bmNumber.get()); + assert globalCount.get() == 0; + assert fsCount.get() == 0; + assert bmCount.get() == 0; + } + + /** + * Test write lock for the input lock mode. + * @param fsn FSNLockManager + * @param mode LockMode + * @param opName operation name + * @param counter counter to trace this lock mode + */ + private void writeLock(FSNLockManager fsn, FSNamesystemLockMode mode, + String opName, AtomicLong counter) { + fsn.writeLock(mode); + try { + counter.incrementAndGet(); + } finally { + fsn.writeUnlock(mode, opName); + } + fsn.writeLock(mode); + try { + counter.decrementAndGet(); + } finally { + fsn.writeUnlock(mode, opName); + } + } + + /** + * Test read lock for the input lock mode. + * @param fsn FSNLockManager + * @param mode LockMode + * @param opName operation name + * @param counter counter to trace this lock mode + */ + private void readLock(FSNLockManager fsn, FSNamesystemLockMode mode, + String opName, AtomicLong counter) { + fsn.readLock(mode); + try { + counter.get(); + } finally { + fsn.readUnlock(mode, opName); + } + } + + /** + * Test write lock for the input lock mode. + * @param fsn FSNLockManager + * @param mode LockMode + * @param opName operation name + * @param counter counter to trace this lock mode + */ + private void writeLockInterruptibly(FSNLockManager fsn, FSNamesystemLockMode mode, + String opName, AtomicLong counter) { + boolean success = false; + try { + fsn.writeLockInterruptibly(mode); + try { + counter.incrementAndGet(); + success = true; + } finally { + fsn.writeUnlock(mode, opName); + } + } catch (InterruptedException e) { + log.info("InterruptedException happens in thread {}" + + " during increasing the Count.", opName); + // ignore; + } + while (success) { + try { + fsn.writeLockInterruptibly(mode); + try { + counter.decrementAndGet(); + success = false; + } finally { + fsn.writeUnlock(mode, opName); + } + } catch (InterruptedException e) { + log.info("InterruptedException happens in thread {}" + + " during decreasing the Count.", opName); + // ignore. + } + } + } + + /** + * Test read lock for the input lock mode. + * @param fsn FSNLockManager + * @param mode LockMode + * @param opName operation name + * @param counter counter to trace this lock mode + */ + private void readLockInterruptibly(FSNLockManager fsn, FSNamesystemLockMode mode, + String opName, AtomicLong counter) { + try { + fsn.readLockInterruptibly(mode); + try { + counter.get(); + } finally { + fsn.readUnlock(mode, opName); + } + } catch (InterruptedException e) { + log.info("InterruptedException happens in thread {}" + + " during getting the Count.", opName); + // ignore + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org