This is an automated email from the ASF dual-hosted git repository. zanderxu pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit e8807726b8d83fa2fb1f9ec47a000237163cd994 Author: ZanderXu <zande...@apache.org> AuthorDate: Wed Dec 18 18:24:01 2024 +0800 HDFS-17506. [FGL] Performance for phase 1 --- .../namenode/fgl/FSNLockBenchmarkThroughput.java | 322 +++++++++++++++++++++ .../fgl/TestFSNLockBenchmarkThroughput.java | 104 +++++++ 2 files changed, 426 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/fgl/FSNLockBenchmarkThroughput.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/fgl/FSNLockBenchmarkThroughput.java new file mode 100644 index 00000000000..c16a8961c20 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/fgl/FSNLockBenchmarkThroughput.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode.fgl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; + +/** + * This class benchmarks the throughput of NN for both global-lock and fine-grained lock. + * Using some common used RPCs, such as create,addBlock,complete,append,rename, + * delete,setPermission,setOwner,setReplication,getFileInfo,getListing,getBlockLocation, + * to build some tasks according to readWrite ratio and testing count. + * Then create a thread pool with a concurrency of the numClient to perform these tasks. + * The performance difference between Global Lock and Fine-grained Lock can be + * obtained according to the execution time. + */ +public class FSNLockBenchmarkThroughput extends Configured implements Tool { + + private final FileSystem fileSystem; + + public FSNLockBenchmarkThroughput(FileSystem fileSystem) { + this.fileSystem = fileSystem; + } + + public void benchmark(Path basePath, int readWriteRatio, int testingCount, + int numClients) throws Exception { + // private final + ArrayList<Path> readingPaths = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + Path path = new Path(basePath, "reading_" + i); + internalWriteFile(this.fileSystem, path, true, null); + readingPaths.add(path); + } + + HashMap<String, Integer> detailInfo = new HashMap<>(); + + // building tasks. + List<Callable<Void>> tasks = buildTasks(this.fileSystem, + basePath, testingCount, readWriteRatio, readingPaths, detailInfo); + Collections.shuffle(tasks); + + long startTime = System.currentTimeMillis(); + + ExecutorService executors = Executors.newFixedThreadPool(numClients); + try { + // Submit all tasks. + List<Future<Void>> futures = executors.invokeAll(tasks); + + // Waiting result + for (Future<Void> f : futures) { + f.get(); + } + + long endTime = System.currentTimeMillis(); + // Print result. + System.out.println("The Benchmark result is: " + tasks.size() + + " tasks with readWriteRatio " + readWriteRatio + " completed, taking " + + (endTime - startTime) + "(ms)"); + detailInfo.forEach( + (k, v) -> System.out.println("\t operationName:" + k + ", testCount:" + v)); + } finally { + executors.shutdown(); + executors.shutdownNow(); + + for (Path path : readingPaths) { + this.fileSystem.delete(path, false); + } + } + } + + // Write a little data to the path. + private void internalWriteFile(FileSystem fs, Path path, boolean writeData, + HashMap<String, Integer> detailInfo) throws IOException { + try (FSDataOutputStream outputStream = fs.create(path)) { + incOp(detailInfo, "create"); + incOp(detailInfo, "complete"); + byte[] data = new byte[1024]; + ThreadLocalRandom.current().nextBytes(data); + if (writeData) { + outputStream.write(data); + incOp(detailInfo, "addBlock"); + } + } + } + + // Append a little data to the path + private void internalAppendFile(FileSystem fs, Path path, boolean writeData, + HashMap<String, Integer> detailInfo) throws IOException { + try (FSDataOutputStream outputStream = fs.append(path)) { + incOp(detailInfo, "append"); + incOp(detailInfo, "complete"); + byte[] data = new byte[1024]; + ThreadLocalRandom.current().nextBytes(data); + if (writeData) { + outputStream.write(data); + } + } + } + + /** + * Include getListing. + */ + private Callable<Void> getListing(FileSystem fs, Path path, + HashMap<String, Integer> detailInfo) { + return () -> { + fs.listStatus(path); + incOp(detailInfo, "getListing"); + return null; + }; + } + + private synchronized void incOp(HashMap<String, Integer> detailInfo, String opName) { + if (detailInfo != null) { + int value = detailInfo.getOrDefault(opName, 0); + detailInfo.put(opName, value + 1); + } + } + + /** + * Include getBlockLocation. + */ + private Callable<Void> getBlockLocation(FileSystem fs, Path path, + HashMap<String, Integer> detailInfo) { + return () -> { + fs.getFileBlockLocations(path, 0, Long.MAX_VALUE); + incOp(detailInfo, "getBlockLocation"); + return null; + }; + } + + /** + * Include getFileInfo. + */ + private Callable<Void> getFileInfo(FileSystem fs, Path path, + HashMap<String, Integer> detailInfo) { + return () -> { + fs.getFileStatus(path); + incOp(detailInfo, "getFileInfo"); + return null; + }; + } + + /** + * Include create, addBlock, complete. + */ + private Callable<Void> writeAndDeleteFile(FileSystem fs, Path path, + HashMap<String, Integer> detailInfo) { + return () -> { + internalWriteFile(fs, path, false, detailInfo); + fs.delete(path, false); + incOp(detailInfo, "delete"); + return null; + }; + } + + /** + * Include create, addBlock, complete, append, complete, rename, setPermission, setOwner, + * setReplication and delete. + */ + private Callable<Void> otherWriteOperation(FileSystem fs, Path path, + Path renameTargetPath, HashMap<String, Integer> detailInfo) { + return () -> { + // Create one file + internalWriteFile(fs, path, false, detailInfo); + + // Append some data + internalAppendFile(fs, path, true, detailInfo); + + // Rename + fs.rename(path, renameTargetPath); + incOp(detailInfo, "rename"); + + // SetPermission + fs.setPermission(renameTargetPath, FsPermission.getDirDefault()); + incOp(detailInfo, "setPermission"); + + // SetOwner + fs.setOwner(renameTargetPath, "mock_user", "mock_group"); + incOp(detailInfo, "setOwner"); + + // SetReplication + fs.setReplication(renameTargetPath, (short) 4); + incOp(detailInfo, "setReplication"); + + // Delete + fs.delete(renameTargetPath, false); + incOp(detailInfo, "delete"); + + return null; + }; + } + + /** + * Building some callable tasks according to testingCount and readWriteRatio. + */ + private List<Callable<Void>> buildTasks(FileSystem fs, Path basePath, int testingCount, + int readWriteRatio, ArrayList<Path> readingPaths, HashMap<String, Integer> detailInfo) { + List<Callable<Void>> tasks = new ArrayList<>(); + + for (int i = 0; i < testingCount; i++) { + // contains 4 * 10 write RPCs. + for (int j = 0; j < 10; j++) { + Path path = new Path(basePath, "write_" + i + "_" + j); + tasks.add(writeAndDeleteFile(fs, path, detailInfo)); + } + + // contains 10 write RPCs. + Path srcPath = new Path(basePath, "src_" + i + "_" + System.nanoTime()); + Path targetPath = new Path(basePath, "target_" + i + "_" + System.nanoTime()); + tasks.add(otherWriteOperation(fs, srcPath, targetPath, detailInfo)); + + // The number of write RPCs is 50. + for (int j = 0; j < readWriteRatio * 50; j++) { + int opNumber = ThreadLocalRandom.current().nextInt(5); + int pathIndex = ThreadLocalRandom.current().nextInt(readingPaths.size()); + Path path = readingPaths.get(pathIndex); + if (opNumber <= 1) { + tasks.add(getFileInfo(fs, path, detailInfo)); + } else if (opNumber <= 3) { + tasks.add(getBlockLocation(fs, path, detailInfo)); + } else { + tasks.add(getListing(fs, path, detailInfo)); + } + } + } + return tasks; + } + + @Override + public int run(String[] args) throws Exception { + String basePath = "/tmp/fsnlock/benchmark/throughput"; + int readWriteRatio = 20; + int testingCount = 100; + int numClients = 100; + + if (args.length >= 4) { + basePath = args[0]; + + try { + readWriteRatio = Integer.parseInt(args[1]); + if (readWriteRatio <= 0) { + printUsage("Invalid readWrite ratio: " + readWriteRatio); + } + } catch (NumberFormatException e) { + printUsage("Invalid readWrite ratio: " + e.getMessage()); + } + + try { + testingCount = Integer.parseInt(args[2]); + if (testingCount <= 0) { + printUsage("Invalid testing count: " + testingCount); + } + } catch (NumberFormatException e) { + printUsage("Invalid testing count: " + e.getMessage()); + } + + try { + numClients = Integer.parseInt(args[3]); + if (numClients <= 0) { + printUsage("Invalid num of clients: " + numClients); + } + } catch (NumberFormatException e) { + printUsage("Invalid num of clients: " + e.getMessage()); + } + } else { + printUsage(null); + } + + benchmark(new Path(basePath), readWriteRatio, testingCount, numClients); + return 0; + } + + private static void printUsage(String msg) { + if (msg != null) { + System.out.println(msg); + } + System.err.println("Usage: FSNLockBenchmarkThroughput " + + "<base path> <read write ratio> <testing count> <num clients>"); + System.exit(1); + } + + public static void main(String[] args) throws Exception { + Configuration conf = new HdfsConfiguration(); + FileSystem fs = FileSystem.get(conf); + int res = ToolRunner.run(conf, new FSNLockBenchmarkThroughput(fs), args); + System.exit(res); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/fgl/TestFSNLockBenchmarkThroughput.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/fgl/TestFSNLockBenchmarkThroughput.java new file mode 100644 index 00000000000..fa5d0814b59 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/fgl/TestFSNLockBenchmarkThroughput.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode.fgl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Assert; +import org.junit.Test; + +/** + * To test {@link FSNLockBenchmarkThroughput}. + */ +public class TestFSNLockBenchmarkThroughput { + + @Test + public void testFineGrainedLockingBenchmarkThroughput1() throws Exception { + testBenchmarkThroughput(true, 20, 100, 1000); + } + + @Test + public void testFineGrainedLockingBenchmarkThroughput2() throws Exception { + testBenchmarkThroughput(true, 20, 1000, 1000); + } + + @Test + public void testFineGrainedLockingBenchmarkThroughput3() throws Exception { + testBenchmarkThroughput(true, 10, 100, 100); + } + + @Test + public void testGlobalLockingBenchmarkThroughput1() throws Exception { + testBenchmarkThroughput(false, 20, 100, 1000); + } + + @Test + public void testGlobalLockingBenchmarkThroughput2() throws Exception { + testBenchmarkThroughput(false, 20, 1000, 1000); + } + + @Test + public void testGlobalLockingBenchmarkThroughput3() throws Exception { + testBenchmarkThroughput(false, 10, 100, 100); + } + + private void testBenchmarkThroughput(boolean enableFGL, int readWriteRatio, + int testingCount, int numClients) throws Exception { + MiniQJMHACluster qjmhaCluster = null; + + try { + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + conf.setInt(DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY, 500); + if (enableFGL) { + conf.setClass(DFSConfigKeys.DFS_NAMENODE_LOCK_MODEL_PROVIDER_KEY, + FineGrainedFSNamesystemLock.class, FSNLockManager.class); + } else { + conf.setClass(DFSConfigKeys.DFS_NAMENODE_LOCK_MODEL_PROVIDER_KEY, + GlobalFSNamesystemLock.class, FSNLockManager.class); + } + + MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf); + builder.getDfsBuilder().numDataNodes(10); + qjmhaCluster = builder.build(); + MiniDFSCluster cluster = qjmhaCluster.getDfsCluster(); + + cluster.transitionToActive(0); + cluster.waitActive(0); + + FileSystem fileSystem = cluster.getFileSystem(0); + + String[] args = new String[]{"/tmp/fsnlock/benchmark/throughput", + String.valueOf(readWriteRatio), String.valueOf(testingCount), + String.valueOf(numClients)}; + + Assert.assertEquals(0, ToolRunner.run(conf, + new FSNLockBenchmarkThroughput(fileSystem), args)); + } finally { + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org