HDFS-8968. Erasure coding: a comprehensive I/O throughput benchmark tool. Contributed by Rui Li.
Change-Id: I01ff5b04727fd79e6373582d4815c5e7b2096c67 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7b00c8e2 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7b00c8e2 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7b00c8e2 Branch: refs/heads/yarn-2877 Commit: 7b00c8e20ee62885097c5e63f110b9eece8ce6b3 Parents: 7f55a18 Author: Zhe Zhang <z...@apache.org> Authored: Thu Nov 12 11:30:05 2015 -0800 Committer: Zhe Zhang <z...@apache.org> Committed: Thu Nov 12 11:30:39 2015 -0800 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/ErasureCodeBenchmarkThroughput.java | 423 +++++++++++++++++++ .../TestErasureCodeBenchmarkThroughput.java | 116 +++++ 3 files changed, 542 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b00c8e2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e6a5b71..1b98d65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -201,6 +201,9 @@ Trunk (Unreleased) HDFS-9234. WebHdfs: getContentSummary() should give quota for storage types. (Surendra Singh Lilhore via xyao) + HDFS-8968. Erasure coding: a comprehensive I/O throughput benchmark tool. + (Rui Li via zhz) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b00c8e2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ErasureCodeBenchmarkThroughput.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ErasureCodeBenchmarkThroughput.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ErasureCodeBenchmarkThroughput.java new file mode 100644 index 0000000..da4b321 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ErasureCodeBenchmarkThroughput.java @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; +import org.apache.hadoop.util.StopWatch; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * This class benchmarks the throughput of client read/write for both replica + * and Erasure Coding. + * <p/> + * Currently 4 operations are supported: read, write, generate and cleanup data. + * Users should specify an operation, the amount of data in MB for a single + * client, and which storage policy to use, i.e. EC or replication. + * Optionally, users can specify the number of clients to launch concurrently. + * The tool launches 1 thread for each client. Number of client is 1 by default. + * For reading, users can also specify whether stateful or positional read + * should be used. Stateful read is chosen by default. + * <p/> + * Each client reads and writes different files. + * For writing, client writes a temporary file at the desired amount, and the + * file will be cleaned up when the test finishes. + * For reading, each client tries to read the file specific to itself. And the + * client simply returns if such file does not exist. Therefore, users should + * generate the files before testing read. Generating data is essentially the + * same as writing, except that the files won't be cleared at the end. + * For example, if the user wants to test reading 1024MB data with 10 clients, + * he/she should firstly generate 1024MB data with 10 (or more) clients. + */ +public class ErasureCodeBenchmarkThroughput extends Configured implements Tool { + + private static final int BUFFER_SIZE_MB = 128; + private static final String DFS_TMP_DIR = System.getProperty( + "test.benchmark.data", "/tmp/benchmark/data"); + public static final String REP_DIR = DFS_TMP_DIR + "/replica"; + public static final String EC_DIR = DFS_TMP_DIR + "/ec"; + private static final String REP_FILE_BASE = "rep-file-"; + private static final String EC_FILE_BASE = "ec-file-"; + private static final String TMP_FILE_SUFFIX = ".tmp"; + private static final ErasureCodingPolicy ecPolicy = + ErasureCodingPolicyManager.getSystemDefaultPolicy(); + private static final byte[] data = new byte[BUFFER_SIZE_MB * 1024 * 1024]; + + static { + Random random = new Random(); + random.nextBytes(data); + } + + private final FileSystem fs; + + public static ErasureCodingPolicy getEcPolicy() { + return ecPolicy; + } + + public ErasureCodeBenchmarkThroughput(FileSystem fs) { + Preconditions.checkArgument(fs instanceof DistributedFileSystem); + this.fs = fs; + } + + enum OpType { + READ, WRITE, GEN, CLEAN; + } + + public static String getFilePath(int dataSizeMB, boolean isEc) { + String parent = isEc ? EC_DIR : REP_DIR; + String file = isEc ? EC_FILE_BASE : REP_FILE_BASE; + return parent + "/" + file + dataSizeMB + "MB"; + } + + private static void printUsage(String msg) { + if (msg != null) { + System.out.println(msg); + } + System.err.println("Usage: ErasureCodeBenchmarkThroughput " + + "<read|write|gen|clean> <size in MB> <ec|rep> [num clients] [stf|pos]\n" + + "Stateful and positional option is only available for read."); + System.exit(1); + } + + private List<Long> doBenchmark(boolean isRead, int dataSizeMB, int numClients, + boolean isEc, boolean statefulRead, boolean isGen) throws Exception { + CompletionService<Long> cs = new ExecutorCompletionService<Long>( + Executors.newFixedThreadPool(numClients)); + for (int i = 0; i < numClients; i++) { + cs.submit(isRead ? + new ReadCallable(dataSizeMB, isEc, i, statefulRead) : + new WriteCallable(dataSizeMB, isEc, i, isGen)); + } + List<Long> results = new ArrayList<>(numClients); + for (int i = 0; i < numClients; i++) { + results.add(cs.take().get()); + } + return results; + } + + private void setReadThreadPoolSize(int numClients) { + int numThread = numClients * ecPolicy.getNumDataUnits(); + getConf().setInt(HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY, + numThread); + } + + private DecimalFormat getDecimalFormat() { + return new DecimalFormat("#.##"); + } + + private void benchmark(OpType type, int dataSizeMB, + int numClients, boolean isEc, boolean statefulRead) throws Exception { + List<Long> sizes = null; + StopWatch sw = new StopWatch().start(); + switch (type) { + case READ: + sizes = doBenchmark(true, dataSizeMB, numClients, isEc, + statefulRead, false); + break; + case WRITE: + sizes = doBenchmark( + false, dataSizeMB, numClients, isEc, statefulRead, false); + break; + case GEN: + sizes = doBenchmark(false, dataSizeMB, numClients, isEc, + statefulRead, true); + } + long elapsedSec = sw.now(TimeUnit.SECONDS); + double totalDataSizeMB = 0; + for (Long size : sizes) { + if (size >= 0) { + totalDataSizeMB += size.doubleValue() / 1024 / 1024; + } + } + double throughput = totalDataSizeMB / elapsedSec; + DecimalFormat df = getDecimalFormat(); + System.out.println(type + " " + df.format(totalDataSizeMB) + + " MB data takes: " + elapsedSec + " s.\nTotal throughput: " + + df.format(throughput) + " MB/s."); + } + + private void setUpDir() throws IOException { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + dfs.mkdirs(new Path(DFS_TMP_DIR)); + Path repPath = new Path(REP_DIR); + Path ecPath = new Path(EC_DIR); + if (!dfs.exists(repPath)) { + dfs.mkdirs(repPath); + } else { + Preconditions.checkArgument( + dfs.getClient().getErasureCodingPolicy(repPath.toString()) == null); + } + if (!dfs.exists(ecPath)) { + dfs.mkdirs(ecPath); + dfs.getClient().setErasureCodingPolicy(ecPath.toString(), ecPolicy); + } else { + Preconditions.checkArgument( + dfs.getClient(). + getErasureCodingPolicy(ecPath.toString()).equals(ecPolicy)); + } + } + + @Override + public int run(String[] args) throws Exception { + OpType type = null; + int dataSizeMB = 0; + boolean isEc = true; + int numClients = 1; + boolean statefulRead = true; + if (args.length >= 3) { + if (args[0].equals("read")) { + type = OpType.READ; + } else if (args[0].equals("write")) { + type = OpType.WRITE; + } else if (args[0].equals("gen")) { + type = OpType.GEN; + } else if (args[0].equals("clean")) { + type = OpType.CLEAN; + } else { + printUsage("Unknown operation: " + args[0]); + } + try { + dataSizeMB = Integer.valueOf(args[1]); + if (dataSizeMB <= 0) { + printUsage("Invalid data size: " + dataSizeMB); + } + } catch (NumberFormatException e) { + printUsage("Invalid data size: " + e.getMessage()); + } + isEc = args[2].equals("ec"); + if (!isEc && !args[2].equals("rep")) { + printUsage("Unknown storage policy: " + args[2]); + } + } else { + printUsage(null); + } + if (args.length >= 4 && type != OpType.CLEAN) { + try { + numClients = Integer.valueOf(args[3]); + if (numClients <= 0) { + printUsage("Invalid num of clients: " + numClients); + } + } catch (NumberFormatException e) { + printUsage("Invalid num of clients: " + e.getMessage()); + } + } + if (args.length >= 5 && type == OpType.READ) { + statefulRead = args[4].equals("stf"); + if (!statefulRead && !args[4].equals("pos")) { + printUsage("Unknown read mode: " + args[4]); + } + } + + setUpDir(); + if (type == OpType.CLEAN) { + cleanUp(dataSizeMB, isEc); + } else { + if (type == OpType.READ && isEc) { + setReadThreadPoolSize(numClients); + } + benchmark(type, dataSizeMB, numClients, isEc, statefulRead); + } + return 0; + } + + private void cleanUp(int dataSizeMB, boolean isEc) throws IOException { + final String fileName = getFilePath(dataSizeMB, isEc); + Path path = isEc ? new Path(EC_DIR) : new Path(REP_DIR); + FileStatus fileStatuses[] = fs.listStatus(path, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.toString().contains(fileName); + } + }); + for (FileStatus fileStatus : fileStatuses) { + fs.delete(fileStatus.getPath(), false); + } + } + + /** + * A Callable that returns the number of bytes read/written + */ + private abstract class CallableBase implements Callable<Long> { + protected final int dataSizeMB; + protected final boolean isEc; + protected final int id; + + public CallableBase(int dataSizeMB, boolean isEc, int id) + throws IOException { + this.dataSizeMB = dataSizeMB; + this.isEc = isEc; + this.id = id; + } + + protected String getFilePathForThread() { + return getFilePath(dataSizeMB, isEc) + "_" + id; + } + } + + private class WriteCallable extends CallableBase { + private final boolean isGen; + + public WriteCallable(int dataSizeMB, boolean isEc, int id, boolean isGen) + throws IOException { + super(dataSizeMB, isEc, id); + this.isGen = isGen; + } + + private long writeFile(Path path) throws IOException { + StopWatch sw = new StopWatch().start(); + System.out.println("Writing " + path); + long dataSize = dataSizeMB * 1024 * 1024L; + long remaining = dataSize; + try (FSDataOutputStream outputStream = fs.create(path)) { + if (!isGen) { + fs.deleteOnExit(path); + } + int toWrite; + while (remaining > 0) { + toWrite = (int) Math.min(remaining, data.length); + outputStream.write(data, 0, toWrite); + remaining -= toWrite; + } + System.out.println("Finished writing " + path + ". Time taken: " + + sw.now(TimeUnit.SECONDS) + " s."); + return dataSize - remaining; + } + } + + @Override + public Long call() throws Exception { + String pathStr = getFilePathForThread(); + if (!isGen) { + pathStr += TMP_FILE_SUFFIX; + } + final Path path = new Path(pathStr); + if (fs.exists(path)) { + if (isGen) { + System.out.println("Data already generated at " + path); + } else { + System.out.println("Previous tmp data not cleaned " + path); + } + return 0L; + } + return writeFile(path); + } + } + + private class ReadCallable extends CallableBase { + private final boolean statefulRead; + + public ReadCallable(int dataSizeMB, boolean isEc, int id, + boolean statefulRead) throws IOException { + super(dataSizeMB, isEc, id); + this.statefulRead = statefulRead; + } + + private long doStateful(FSDataInputStream inputStream) throws IOException { + long count = 0; + long bytesRead; + ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE_MB * 1024 * 1024); + while (true) { + bytesRead = inputStream.read(buffer); + if (bytesRead < 0) { + break; + } + count += bytesRead; + buffer.clear(); + } + return count; + } + + private long doPositional(FSDataInputStream inputStream) + throws IOException { + long count = 0; + long bytesRead; + byte buf[] = new byte[BUFFER_SIZE_MB * 1024 * 1024]; + while (true) { + bytesRead = inputStream.read(count, buf, 0, buf.length); + if (bytesRead < 0) { + break; + } + count += bytesRead; + } + return count; + } + + private long readFile(Path path) throws IOException { + try (FSDataInputStream inputStream = fs.open(path)) { + StopWatch sw = new StopWatch().start(); + System.out.println((statefulRead ? "Stateful reading " : + "Positional reading ") + path); + long totalRead = statefulRead ? doStateful(inputStream) : + doPositional(inputStream); + System.out.println( + (statefulRead ? "Finished stateful read " : + "Finished positional read ") + path + ". Time taken: " + + sw.now(TimeUnit.SECONDS) + " s."); + return totalRead; + } + } + + @Override + public Long call() throws Exception { + Path path = new Path(getFilePathForThread()); + if (!fs.exists(path) || fs.isDirectory(path)) { + System.out.println("File not found at " + path + + ". Call gen first?"); + return 0L; + } + long bytesRead = readFile(path); + long dataSize = dataSizeMB * 1024 * 1024L; + Preconditions.checkArgument(bytesRead == dataSize, + "Specified data size: " + dataSize + ", actually read " + bytesRead); + return bytesRead; + } + } + + public static void main(String[] args) throws Exception { + Configuration conf = new HdfsConfiguration(); + FileSystem fs = FileSystem.get(conf); + int res = ToolRunner.run(conf, + new ErasureCodeBenchmarkThroughput(fs), args); + System.exit(res); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7b00c8e2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodeBenchmarkThroughput.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodeBenchmarkThroughput.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodeBenchmarkThroughput.java new file mode 100644 index 0000000..0b409bc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodeBenchmarkThroughput.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.util.ToolRunner; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +/** + * To test {@link org.apache.hadoop.hdfs.ErasureCodeBenchmarkThroughput}. + */ +public class TestErasureCodeBenchmarkThroughput { + private static MiniDFSCluster cluster; + private static Configuration conf; + private static FileSystem fs; + + @BeforeClass + public static void setup() throws IOException { + conf = new HdfsConfiguration(); + int numDN = ErasureCodeBenchmarkThroughput.getEcPolicy().getNumDataUnits() + + ErasureCodeBenchmarkThroughput.getEcPolicy().getNumParityUnits(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDN).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + } + + @AfterClass + public static void tearDown() { + if (cluster != null) { + cluster.shutdown(true); + } + } + + private static void runBenchmark(String[] args) throws Exception { + Assert.assertNotNull(conf); + Assert.assertNotNull(fs); + Assert.assertEquals(0, ToolRunner.run(conf, + new ErasureCodeBenchmarkThroughput(fs), args)); + } + + private static void verifyNumFile(final int dataSize, final boolean isEc, + int numFile) throws IOException { + Path path = isEc ? new Path(ErasureCodeBenchmarkThroughput.EC_DIR) : + new Path(ErasureCodeBenchmarkThroughput.REP_DIR); + FileStatus[] statuses = fs.listStatus(path, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.toString().contains( + ErasureCodeBenchmarkThroughput.getFilePath(dataSize, isEc)); + } + }); + Assert.assertEquals(numFile, statuses.length); + } + + @Test + public void testReplicaReadWrite() throws Exception { + Integer dataSize = 10; + Integer numClient = 3; + String[] args = new String[]{"write", dataSize.toString(), "rep", + numClient.toString()}; + runBenchmark(args); + args[0] = "gen"; + runBenchmark(args); + args[0] = "read"; + runBenchmark(args); + } + + @Test + public void testECReadWrite() throws Exception { + Integer dataSize = 5; + Integer numClient = 5; + String[] args = new String[]{"write", dataSize.toString(), "ec", + numClient.toString()}; + runBenchmark(args); + args[0] = "gen"; + runBenchmark(args); + args[0] = "read"; + runBenchmark(args); + } + + @Test + public void testCleanUp() throws Exception { + Integer dataSize = 5; + Integer numClient = 5; + String[] args = new String[]{"gen", dataSize.toString(), "ec", + numClient.toString()}; + runBenchmark(args); + args[0] = "clean"; + runBenchmark(args); + verifyNumFile(dataSize, true, 0); + } +}