HDFS-10994. Support an XOR policy XOR-2-1-64k in HDFS. Contributed by Sammi Chen
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5d5614f8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5d5614f8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5d5614f8 Branch: refs/heads/HADOOP-13345 Commit: 5d5614f847b2ef2a5b70bd9a06edc4eba06174c6 Parents: 209e805 Author: Kai Zheng <[email protected]> Authored: Mon Nov 28 14:34:44 2016 +0800 Committer: Kai Zheng <[email protected]> Committed: Mon Nov 28 14:34:44 2016 +0800 ---------------------------------------------------------------------- .../io/erasurecode/ErasureCodeConstants.java | 3 + .../hadoop/hdfs/protocol/HdfsConstants.java | 1 + .../namenode/ErasureCodingPolicyManager.java | 23 +- .../hadoop/hdfs/server/namenode/INodeFile.java | 8 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 28 +- .../hadoop/hdfs/TestDFSStripedInputStream.java | 50 ++- .../hadoop/hdfs/TestDFSStripedOutputStream.java | 27 +- .../TestDFSStripedOutputStreamWithFailure.java | 37 +- .../hdfs/TestDFSXORStripedInputStream.java | 33 ++ .../hdfs/TestDFSXORStripedOutputStream.java | 35 ++ ...estDFSXORStripedOutputStreamWithFailure.java | 36 ++ ...tyPreemptionPolicyForReservedContainers.java | 430 ------------------- 12 files changed, 240 insertions(+), 471 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5614f8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCodeConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCodeConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCodeConstants.java index 8d6ff85..ffa0bce 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCodeConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCodeConstants.java @@ -38,4 +38,7 @@ public final class ErasureCodeConstants { public static final ECSchema RS_6_3_LEGACY_SCHEMA = new ECSchema( RS_LEGACY_CODEC_NAME, 6, 3); + + public static final ECSchema XOR_2_1_SCHEMA = new ECSchema( + XOR_CODEC_NAME, 2, 1); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5614f8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index acbc8f6..b55b4df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -147,6 +147,7 @@ public final class HdfsConstants { public static final byte RS_6_3_POLICY_ID = 0; public static final byte RS_3_2_POLICY_ID = 1; public static final byte RS_6_3_LEGACY_POLICY_ID = 2; + public static final byte XOR_2_1_POLICY_ID = 3; /* Hidden constructor */ protected HdfsConstants() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5614f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java index c4bc8de..8a85d23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java @@ -36,7 +36,7 @@ import java.util.TreeMap; public final class ErasureCodingPolicyManager { /** - * TODO: HDFS-8095 + * TODO: HDFS-8095. */ private static final int DEFAULT_CELLSIZE = 64 * 1024; private static final ErasureCodingPolicy SYS_POLICY1 = @@ -48,10 +48,14 @@ public final class ErasureCodingPolicyManager { private static final ErasureCodingPolicy SYS_POLICY3 = new ErasureCodingPolicy(ErasureCodeConstants.RS_6_3_LEGACY_SCHEMA, DEFAULT_CELLSIZE, HdfsConstants.RS_6_3_LEGACY_POLICY_ID); + private static final ErasureCodingPolicy SYS_POLICY4 = + new ErasureCodingPolicy(ErasureCodeConstants.XOR_2_1_SCHEMA, + DEFAULT_CELLSIZE, HdfsConstants.XOR_2_1_POLICY_ID); //We may add more later. private static final ErasureCodingPolicy[] SYS_POLICIES = - new ErasureCodingPolicy[]{SYS_POLICY1, SYS_POLICY2, SYS_POLICY3}; + new ErasureCodingPolicy[]{SYS_POLICY1, SYS_POLICY2, SYS_POLICY3, + SYS_POLICY4}; // Supported storage policies for striped EC files private static final byte[] SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE = new byte[] { @@ -97,6 +101,19 @@ public final class ErasureCodingPolicyManager { } /** + * Get system-wide policy by policy ID. + * @return ecPolicy + */ + public static ErasureCodingPolicy getPolicyByPolicyID(byte id) { + for (ErasureCodingPolicy policy : SYS_POLICIES) { + if (policy.getId() == id) { + return policy; + } + } + return null; + } + + /** * Get all policies that's available to use. * @return all policies */ @@ -141,7 +158,7 @@ public final class ErasureCodingPolicyManager { } /** - * Clear and clean up + * Clear and clean up. */ public void clear() { activePoliciesByName.clear(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5614f8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 37f97db..a5dcee9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -455,9 +455,13 @@ public class INodeFile extends INodeWithAdditionalFields if(!isStriped()){ return max; } - // TODO support more policies based on policyId + ErasureCodingPolicy ecPolicy = - ErasureCodingPolicyManager.getSystemDefaultPolicy(); + ErasureCodingPolicyManager.getPolicyByPolicyID( + getErasureCodingPolicyID()); + if (ecPolicy == null){ + ecPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy(); + } return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5614f8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 13e2656..1fbc1d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1888,21 +1888,41 @@ public class DFSTestUtil { * Creates the metadata of a file in striped layout. This method only * manipulates the NameNode state without injecting data to DataNode. * You should disable periodical heartbeat before use this. - * @param file Path of the file to create + * @param file Path of the file to create * @param dir Parent path of the file * @param numBlocks Number of striped block groups to add to the file * @param numStripesPerBlk Number of striped cells in each block * @param toMkdir */ - public static void createStripedFile(MiniDFSCluster cluster, Path file, Path dir, - int numBlocks, int numStripesPerBlk, boolean toMkdir) throws Exception { + public static void createStripedFile(MiniDFSCluster cluster, Path file, + Path dir, int numBlocks, int numStripesPerBlk, boolean toMkdir) + throws Exception { + createStripedFile(cluster, file, dir, numBlocks, numStripesPerBlk, + toMkdir, null); + } + + /** + * Creates the metadata of a file in striped layout. This method only + * manipulates the NameNode state without injecting data to DataNode. + * You should disable periodical heartbeat before use this. + * @param file Path of the file to create + * @param dir Parent path of the file + * @param numBlocks Number of striped block groups to add to the file + * @param numStripesPerBlk Number of striped cells in each block + * @param toMkdir + * @param ecPolicy erasure coding policy apply to created file. A null value + * means using default erasure coding policy. + */ + public static void createStripedFile(MiniDFSCluster cluster, Path file, + Path dir, int numBlocks, int numStripesPerBlk, boolean toMkdir, + ErasureCodingPolicy ecPolicy) throws Exception { DistributedFileSystem dfs = cluster.getFileSystem(); // If outer test already set EC policy, dir should be left as null if (toMkdir) { assert dir != null; dfs.mkdirs(dir); try { - dfs.getClient().setErasureCodingPolicy(dir.toString(), null); + dfs.getClient().setErasureCodingPolicy(dir.toString(), ecPolicy); } catch (IOException e) { if (!e.getMessage().contains("non-empty directory")) { throw e; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5614f8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index 3b46c66..121b9a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -64,20 +64,34 @@ public class TestDFSStripedInputStream { private DistributedFileSystem fs; private final Path dirPath = new Path("/striped"); private Path filePath = new Path(dirPath, "file"); - private final ErasureCodingPolicy ecPolicy = - ErasureCodingPolicyManager.getSystemDefaultPolicy(); - private final short dataBlocks = (short) ecPolicy.getNumDataUnits(); - private final short parityBlocks = (short) ecPolicy.getNumParityUnits(); - private final int cellSize = ecPolicy.getCellSize(); + private ErasureCodingPolicy ecPolicy; + private short dataBlocks; + private short parityBlocks; + private int cellSize; private final int stripesPerBlock = 2; - private final int blockSize = stripesPerBlock * cellSize; - private final int blockGroupSize = dataBlocks * blockSize; + private int blockSize; + private int blockGroupSize; @Rule public Timeout globalTimeout = new Timeout(300000); + public ErasureCodingPolicy getEcPolicy() { + return ErasureCodingPolicyManager.getSystemDefaultPolicy(); + } + @Before public void setup() throws IOException { + /* + * Initialize erasure coding policy. + */ + ecPolicy = getEcPolicy(); + dataBlocks = (short) ecPolicy.getNumDataUnits(); + parityBlocks = (short) ecPolicy.getNumParityUnits(); + cellSize = ecPolicy.getCellSize(); + blockSize = stripesPerBlock * cellSize; + blockGroupSize = dataBlocks * blockSize; + System.out.println("EC policy = " + ecPolicy); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); if (ErasureCodeNative.isNativeCodeLoaded()) { @@ -94,7 +108,7 @@ public class TestDFSStripedInputStream { } fs = cluster.getFileSystem(); fs.mkdirs(dirPath); - fs.getClient().setErasureCodingPolicy(dirPath.toString(), null); + fs.getClient().setErasureCodingPolicy(dirPath.toString(), ecPolicy); } @After @@ -106,13 +120,13 @@ public class TestDFSStripedInputStream { } /** - * Test {@link DFSStripedInputStream#getBlockAt(long)} + * Test {@link DFSStripedInputStream#getBlockAt(long)}. */ @Test public void testRefreshBlock() throws Exception { final int numBlocks = 4; DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, - stripesPerBlock, false); + stripesPerBlock, false, ecPolicy); LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( filePath.toString(), 0, blockGroupSize * numBlocks); final DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), @@ -136,7 +150,7 @@ public class TestDFSStripedInputStream { public void testPread() throws Exception { final int numBlocks = 2; DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, - stripesPerBlock, false); + stripesPerBlock, false, ecPolicy); LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( filePath.toString(), 0, blockGroupSize * numBlocks); int fileLen = blockGroupSize * numBlocks; @@ -154,7 +168,9 @@ public class TestDFSStripedInputStream { bg.getBlock().getBlockPoolId()); } - /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ + /** + * A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks + */ for (int i = 0; i < stripesPerBlock; i++) { for (int j = 0; j < dataBlocks; j++) { for (int k = 0; k < cellSize; k++) { @@ -194,7 +210,7 @@ public class TestDFSStripedInputStream { final int numBlocks = 4; final int failedDNIdx = dataBlocks - 1; DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, - stripesPerBlock, false); + stripesPerBlock, false, ecPolicy); LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( filePath.toString(), 0, blockGroupSize); @@ -305,7 +321,7 @@ public class TestDFSStripedInputStream { setup(); } DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, - stripesPerBlock, false); + stripesPerBlock, false, ecPolicy); LocatedBlocks lbs = fs.getClient().namenode. getBlockLocations(filePath.toString(), 0, fileSize); @@ -330,7 +346,9 @@ public class TestDFSStripedInputStream { byte[] expected = new byte[fileSize]; for (LocatedBlock bg : lbs.getLocatedBlocks()) { - /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ + /** + * A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks + */ for (int i = 0; i < stripesPerBlock; i++) { for (int j = 0; j < dataBlocks; j++) { for (int k = 0; k < cellSize; k++) { @@ -371,7 +389,7 @@ public class TestDFSStripedInputStream { final int numBlocks = 4; final int failedDNIdx = dataBlocks - 1; DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, - stripesPerBlock, false); + stripesPerBlock, false, ecPolicy); LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( filePath.toString(), 0, blockGroupSize); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5614f8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index b686f28..5bde16e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -47,23 +47,36 @@ public class TestDFSStripedOutputStream { GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL); } - private final ErasureCodingPolicy ecPolicy = - ErasureCodingPolicyManager.getSystemDefaultPolicy(); - private final int dataBlocks = ecPolicy.getNumDataUnits(); - private final int parityBlocks = ecPolicy.getNumParityUnits(); + private ErasureCodingPolicy ecPolicy; + private int dataBlocks; + private int parityBlocks; private MiniDFSCluster cluster; private DistributedFileSystem fs; private Configuration conf; - private final int cellSize = ecPolicy.getCellSize(); + private int cellSize; private final int stripesPerBlock = 4; - private final int blockSize = cellSize * stripesPerBlock; + private int blockSize; @Rule public Timeout globalTimeout = new Timeout(300000); + public ErasureCodingPolicy getEcPolicy() { + return ErasureCodingPolicyManager.getSystemDefaultPolicy(); + } + @Before public void setup() throws IOException { + /* + * Initialize erasure coding policy. + */ + ecPolicy = getEcPolicy(); + dataBlocks = (short) ecPolicy.getNumDataUnits(); + parityBlocks = (short) ecPolicy.getNumParityUnits(); + cellSize = ecPolicy.getCellSize(); + blockSize = stripesPerBlock * cellSize; + System.out.println("EC policy = " + ecPolicy); + int numDNs = dataBlocks + parityBlocks + 2; conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); @@ -76,7 +89,7 @@ public class TestDFSStripedOutputStream { NativeRSRawErasureCoderFactory.class.getCanonicalName()); } cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); - cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null); + cluster.getFileSystem().getClient().setErasureCodingPolicy("/", ecPolicy); fs = cluster.getFileSystem(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5614f8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index cde07a4..0baf9cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -47,6 +47,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Assume; +import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -76,18 +77,36 @@ public class TestDFSStripedOutputStreamWithFailure { .getLogger().setLevel(Level.ALL); } - private final ErasureCodingPolicy ecPolicy = - ErasureCodingPolicyManager.getSystemDefaultPolicy(); - private final int dataBlocks = ecPolicy.getNumDataUnits(); - private final int parityBlocks = ecPolicy.getNumParityUnits(); - private final int cellSize = ecPolicy.getCellSize(); + private ErasureCodingPolicy ecPolicy; + private int dataBlocks; + private int parityBlocks; + private int cellSize; private final int stripesPerBlock = 4; - private final int blockSize = cellSize * stripesPerBlock; - private final int blockGroupSize = blockSize * dataBlocks; + private int blockSize; + private int blockGroupSize; private static final int FLUSH_POS = 9 * DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1; + public ErasureCodingPolicy getEcPolicy() { + return ErasureCodingPolicyManager.getSystemDefaultPolicy(); + } + + /* + * Initialize erasure coding policy. + */ + @Before + public void init(){ + ecPolicy = getEcPolicy(); + dataBlocks = ecPolicy.getNumDataUnits(); + parityBlocks = ecPolicy.getNumParityUnits(); + cellSize = ecPolicy.getCellSize(); + blockSize = cellSize * stripesPerBlock; + blockGroupSize = blockSize * dataBlocks; + dnIndexSuite = getDnIndexSuite(); + lengths = newLengths(); + } + List<Integer> newLengths() { final List<Integer> lens = new ArrayList<>(); lens.add(FLUSH_POS + 2); @@ -104,7 +123,7 @@ public class TestDFSStripedOutputStreamWithFailure { return lens; } - private final int[][] dnIndexSuite = getDnIndexSuite(); + private int[][] dnIndexSuite; private int[][] getDnIndexSuite() { final int maxNumLevel = 2; @@ -167,7 +186,7 @@ public class TestDFSStripedOutputStreamWithFailure { return positions; } - private final List<Integer> lengths = newLengths(); + private List<Integer> lengths; Integer getLength(int i) { return i >= 0 && i < lengths.size()? lengths.get(i): null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5614f8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedInputStream.java new file mode 100644 index 0000000..75062e0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedInputStream.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; + +/** + * This tests read operation of DFS striped file with XOR-2-1-64k erasure code + * policy. + */ +public class TestDFSXORStripedInputStream extends TestDFSStripedInputStream{ + + public ErasureCodingPolicy getEcPolicy() { + return ErasureCodingPolicyManager.getPolicyByPolicyID( + HdfsConstants.XOR_2_1_POLICY_ID); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5614f8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStream.java new file mode 100644 index 0000000..64bddb8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStream.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; + +/** + * This tests write operation of DFS striped file with XOR-2-1-64k erasure code + * policy. + */ +public class TestDFSXORStripedOutputStream extends TestDFSStripedOutputStream{ + + @Override + public ErasureCodingPolicy getEcPolicy() { + return ErasureCodingPolicyManager.getPolicyByPolicyID( + HdfsConstants.XOR_2_1_POLICY_ID); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5614f8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStreamWithFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStreamWithFailure.java new file mode 100644 index 0000000..ed361a8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStreamWithFailure.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; + +/** + * This tests write operation of DFS striped file with XOR-2-1-64k erasure code + * policy when there is data node failure. + */ +public class TestDFSXORStripedOutputStreamWithFailure + extends TestDFSStripedOutputStreamWithFailure{ + + @Override + public ErasureCodingPolicy getEcPolicy() { + return ErasureCodingPolicyManager.getPolicyByPolicyID( + HdfsConstants.XOR_2_1_POLICY_ID); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5d5614f8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java deleted file mode 100644 index 38b2e78..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java +++ /dev/null @@ -1,430 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; - -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; - -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -public class TestProportionalCapacityPreemptionPolicyForReservedContainers - extends ProportionalCapacityPreemptionPolicyMockFramework { - @Before - public void setup() { - super.setup(); - conf.setBoolean( - CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, - true); - policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); - } - - @Test - public void testPreemptionForSimpleReservedContainer() throws IOException { - /** - * The simplest test of reserved container, Queue structure is: - * - * <pre> - * root - * / \ - * a b - * </pre> - * Guaranteed resource of a/b are 50:50 - * Total cluster resource = 100 - * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each - * container is 1. - * - B has am container at n1, and reserves 1 container with size = 9 at n1, - * so B needs to preempt 9 containers from A at n1 instead of randomly - * preempt from n1 and n2. - */ - String labelsConfig = - "=100,true;"; - String nodesConfig = // n1 / n2 has no label - "n1= res=50;" + - "n2= res=50"; - String queuesConfig = - // guaranteed,max,used,pending,reserved - "root(=[100 100 100 9 9]);" + //root - "-a(=[50 100 90 0]);" + // a - "-b(=[50 100 10 9 9])"; // b - String appsConfig= - //queueName\t(priority,resource,host,expression,#repeat,reserved) - "a\t" // app1 in a - + "(1,1,n1,,45,false)" // 45 in n1 - + "(1,1,n2,,45,false);" + // 45 in n2 - "b\t" // app2 in b - + "(1,1,n1,,1,false)" // AM container in n1 - + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 - - buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); - policy.editSchedule(); - - // Total 5 preempted from app1 at n1, don't preempt container from other - // app/node - verify(mDisp, times(5)).handle(argThat( - new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( - getAppAttemptId(1)))); - verify(mDisp, times(5)).handle( - argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", - NodeId.newInstance("n1", 1)))); - verify(mDisp, times(0)).handle(argThat( - new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( - getAppAttemptId(2)))); - } - - @Test - public void testUseReservedAndFifoSelectorTogether() throws IOException { - /** - * Queue structure is: - * - * <pre> - * root - * / \ - * a b - * </pre> - * Guaranteed resource of a/b are 30:70 - * Total cluster resource = 100 - * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each - * container is 1. - * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1, - * B also has 20 pending resources. - * so B needs to preempt: - * - 10 containers from n1 (for reserved) - * - 5 containers from n2 for pending resources - */ - String labelsConfig = - "=100,true;"; - String nodesConfig = // n1 / n2 has no label - "n1= res=50;" + - "n2= res=50"; - String queuesConfig = - // guaranteed,max,used,pending,reserved - "root(=[100 100 100 70 10]);" + //root - "-a(=[30 100 45 0]);" + // a - "-b(=[70 100 55 70 50])"; // b - String appsConfig= - //queueName\t(priority,resource,host,expression,#repeat,reserved) - "a\t" // app1 in a - + "(1,1,n2,,35,false)" // 35 in n2 - + "(1,1,n1,,10,false);" + // 10 in n1 - "b\t" // app2 in b - + "(1,1,n2,,5,false)" // 5 in n2 - + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1 - - buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); - policy.editSchedule(); - - verify(mDisp, times(15)).handle(argThat( - new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( - getAppAttemptId(1)))); - verify(mDisp, times(10)).handle( - argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", - NodeId.newInstance("n1", 1)))); - verify(mDisp, times(5)).handle( - argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", - NodeId.newInstance("n2", 1)))); - verify(mDisp, times(0)).handle(argThat( - new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( - getAppAttemptId(2)))); - } - - @Test - public void testReservedSelectorSkipsAMContainer() throws IOException { - /** - * Queue structure is: - * - * <pre> - * root - * / \ - * a b - * </pre> - * Guaranteed resource of a/b are 30:70 - * Total cluster resource = 100 - * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each - * container is 1. - * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1, - * B also has 20 pending resources. - * - * Ideally B needs to preempt: - * - 10 containers from n1 (for reserved) - * - 5 containers from n2 for pending resources - * - * However, since one AM container is located at n1 (from queueA), we cannot - * preempt 10 containers from n1 for reserved container. Instead, we will - * preempt 15 containers from n2, since containers from queueA launched in n2 - * are later than containers from queueA launched in n1 (FIFO order of containers) - */ - String labelsConfig = - "=100,true;"; - String nodesConfig = // n1 / n2 has no label - "n1= res=50;" + - "n2= res=50"; - String queuesConfig = - // guaranteed,max,used,pending,reserved - "root(=[100 100 100 70 10]);" + //root - "-a(=[30 100 45 0]);" + // a - "-b(=[70 100 55 70 50])"; // b - String appsConfig= - //queueName\t(priority,resource,host,expression,#repeat,reserved) - "a\t" // app1 in a - + "(1,1,n1,,10,false)" // 10 in n1 - + "(1,1,n2,,35,false);" +// 35 in n2 - "b\t" // app2 in b - + "(1,1,n2,,5,false)" // 5 in n2 - + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1 - - buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); - policy.editSchedule(); - - verify(mDisp, times(15)).handle(argThat( - new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( - getAppAttemptId(1)))); - verify(mDisp, times(0)).handle( - argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", - NodeId.newInstance("n1", 1)))); - verify(mDisp, times(15)).handle( - argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", - NodeId.newInstance("n2", 1)))); - verify(mDisp, times(0)).handle(argThat( - new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( - getAppAttemptId(2)))); - } - - @Test - public void testPreemptionForReservedContainerRespectGuaranteedResource() - throws IOException { - /** - * Queue structure is: - * - * <pre> - * root - * / \ - * a b - * </pre> - * Guaranteed resource of a/b are 85:15 - * Total cluster resource = 100 - * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each - * container is 1. - * - B has am container at n1, and reserves 1 container with size = 9 at n1, - * - * If we preempt 9 containers from queue-A, queue-A will be below its - * guaranteed resource = 90 - 9 = 81 < 85. - * - * So no preemption will take place - */ - String labelsConfig = - "=100,true;"; - String nodesConfig = // n1 / n2 has no label - "n1= res=50;" + - "n2= res=50"; - String queuesConfig = - // guaranteed,max,used,pending,reserved - "root(=[100 100 100 9 9]);" + //root - "-a(=[85 100 90 0]);" + // a - "-b(=[15 100 10 9 9])"; // b - String appsConfig= - //queueName\t(priority,resource,host,expression,#repeat,reserved) - "a\t" // app1 in a - + "(1,1,n1,,45,false)" // 45 in n1 - + "(1,1,n2,,45,false);" + // 45 in n2 - "b\t" // app2 in b - + "(1,1,n1,,1,false)" // AM container in n1 - + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 - - buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); - policy.editSchedule(); - - verify(mDisp, times(0)).handle(argThat( - new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( - getAppAttemptId(1)))); - verify(mDisp, times(0)).handle(argThat( - new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( - getAppAttemptId(2)))); - } - - @Test - public void testPreemptionForReservedContainerWhichHasAvailableResource() - throws IOException { - /** - * Queue structure is: - * - * <pre> - * root - * / \ - * a b - * </pre> - * - * Guaranteed resource of a/b are 50:50 - * Total cluster resource = 100 - * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each - * container is 1. - * - B has am container at n1, and reserves 1 container with size = 9 at n1, - * - * So we can get 4 containers preempted after preemption. - * (reserved 5 + preempted 4) = 9 - */ - String labelsConfig = - "=100,true;"; - String nodesConfig = // n1 / n2 has no label - "n1= res=50;" + - "n2= res=50"; - String queuesConfig = - // guaranteed,max,used,pending,reserved - "root(=[100 100 99 9 9]);" + //root - "-a(=[50 100 90 0]);" + // a - "-b(=[50 100 9 9 9])"; // b - String appsConfig= - //queueName\t(priority,resource,host,expression,#repeat,reserved) - "a\t" // app1 in a - + "(1,1,n1,,45,false)" // 45 in n1 - + "(1,1,n2,,45,false);" + // 45 in n2 - "b\t" // app2 in b - + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 - - buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); - policy.editSchedule(); - - // Total 4 preempted from app1 at n1, don't preempt container from other - // app/node - verify(mDisp, times(4)).handle(argThat( - new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", - NodeId.newInstance("n1", 1)))); - verify(mDisp, times(0)).handle(argThat( - new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", - NodeId.newInstance("n2", 1)))); - } - - @Test - public void testPreemptionForReservedContainerWhichHasNondivisibleAvailableResource() - throws IOException { - /** - * Queue structure is: - * - * <pre> - * root - * / \ - * a b - * </pre> - * - * Guaranteed resource of a/b are 50:50 - * Total cluster resource = 100 - * - A has 45 containers on two node, size of each container is 2, - * n1 has 23, n2 has 22 - * - B reserves 1 container with size = 9 at n1, - * - * So we can get 4 containers (total-resource = 8) preempted after - * preemption. Actual required is 3.5, but we need to preempt integer - * number of containers - */ - String labelsConfig = - "=100,true;"; - String nodesConfig = // n1 / n2 has no label - "n1= res=50;" + - "n2= res=50"; - String queuesConfig = - // guaranteed,max,used,pending,reserved - "root(=[100 100 99 9 9]);" + //root - "-a(=[50 100 90 0]);" + // a - "-b(=[50 100 9 9 9])"; // b - String appsConfig= - //queueName\t(priority,resource,host,expression,#repeat,reserved) - "a\t" // app1 in a - + "(1,2,n1,,24,false)" // 48 in n1 - + "(1,2,n2,,23,false);" + // 46 in n2 - "b\t" // app2 in b - + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 - - buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); - policy.editSchedule(); - - // Total 4 preempted from app1 at n1, don't preempt container from other - // app/node - verify(mDisp, times(4)).handle(argThat( - new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", - NodeId.newInstance("n1", 1)))); - verify(mDisp, times(0)).handle(argThat( - new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", - NodeId.newInstance("n2", 1)))); - } - - @Test - public void testPreemptionForReservedContainerRespectAvailableResources() - throws IOException { - /** - * Queue structure is: - * - * <pre> - * root - * / \ - * a b - * </pre> - * - * Guaranteed resource of a/b are 50:50 - * Total cluster resource = 100, 4 nodes, 25 on each node - * - A has 10 containers on every node, size of container is 2 - * - B reserves 1 container with size = 9 at n1, - * - * So even if we cannot allocate container for B now, no preemption should - * happen since there're plenty of available resources. - */ - String labelsConfig = - "=100,true;"; - String nodesConfig = - "n1= res=25;" + - "n2= res=25;" + - "n3= res=25;" + - "n4= res=25;"; - String queuesConfig = - // guaranteed,max,used,pending,reserved - "root(=[100 100 89 9 9]);" + //root - "-a(=[50 100 80 0]);" + // a - "-b(=[50 100 9 9 9])"; // b - String appsConfig= - //queueName\t(priority,resource,host,expression,#repeat,reserved) - "a\t" // app1 in a - + "(1,2,n1,,10,false)" // 10 in n1 - + "(1,2,n2,,10,false)" // 10 in n2 - + "(1,2,n3,,10,false)" // 10 in n3 - + "(1,2,n4,,10,false);" + // 10 in n4 - "b\t" // app2 in b - + "(1,9,n1,,1,true)"; // 1 container with size=5 reserved at n1 - - buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); - policy.editSchedule(); - - // No preemption should happen - verify(mDisp, times(0)).handle(argThat( - new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", - NodeId.newInstance("n1", 1)))); - verify(mDisp, times(0)).handle(argThat( - new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", - NodeId.newInstance("n2", 1)))); - verify(mDisp, times(0)).handle(argThat( - new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", - NodeId.newInstance("n3", 1)))); - verify(mDisp, times(0)).handle(argThat( - new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", - NodeId.newInstance("n4", 1)))); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
