Revert due to an error "HDFS-10994. Support an XOR policy XOR-2-1-64k in HDFS. Contributed by Sammi Chen"
This reverts commit 5614f847b2ef2a5b70bd9a06edc4eba06174c6. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cfd8076f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cfd8076f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cfd8076f Branch: refs/heads/YARN-5085 Commit: cfd8076f81930c3ffea8ec2ef42926217b83ab1a Parents: aeecfa2 Author: Kai Zheng <kai.zh...@intel.com> Authored: Wed Nov 30 15:44:52 2016 +0800 Committer: Kai Zheng <kai.zh...@intel.com> Committed: Wed Nov 30 15:44:52 2016 +0800 ---------------------------------------------------------------------- .../io/erasurecode/ErasureCodeConstants.java | 3 - .../hadoop/hdfs/protocol/HdfsConstants.java | 1 - .../namenode/ErasureCodingPolicyManager.java | 23 +- .../hadoop/hdfs/server/namenode/INodeFile.java | 8 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 28 +- .../hadoop/hdfs/TestDFSStripedInputStream.java | 50 +-- .../hadoop/hdfs/TestDFSStripedOutputStream.java | 27 +- .../TestDFSStripedOutputStreamWithFailure.java | 37 +- .../hdfs/TestDFSXORStripedInputStream.java | 33 -- .../hdfs/TestDFSXORStripedOutputStream.java | 35 -- ...estDFSXORStripedOutputStreamWithFailure.java | 36 -- ...tyPreemptionPolicyForReservedContainers.java | 430 +++++++++++++++++++ 12 files changed, 471 insertions(+), 240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfd8076f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCodeConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCodeConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCodeConstants.java index ffa0bce..8d6ff85 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCodeConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/erasurecode/ErasureCodeConstants.java @@ -38,7 +38,4 @@ public final class ErasureCodeConstants { public static final ECSchema RS_6_3_LEGACY_SCHEMA = new ECSchema( RS_LEGACY_CODEC_NAME, 6, 3); - - public static final ECSchema XOR_2_1_SCHEMA = new ECSchema( - XOR_CODEC_NAME, 2, 1); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfd8076f/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java index b55b4df..acbc8f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java @@ -147,7 +147,6 @@ public final class HdfsConstants { public static final byte RS_6_3_POLICY_ID = 0; public static final byte RS_3_2_POLICY_ID = 1; public static final byte RS_6_3_LEGACY_POLICY_ID = 2; - public static final byte XOR_2_1_POLICY_ID = 3; /* Hidden constructor */ protected HdfsConstants() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfd8076f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java index 8a85d23..c4bc8de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingPolicyManager.java @@ -36,7 +36,7 @@ import java.util.TreeMap; public final class ErasureCodingPolicyManager { /** - * TODO: HDFS-8095. + * TODO: HDFS-8095 */ private static final int DEFAULT_CELLSIZE = 64 * 1024; private static final ErasureCodingPolicy SYS_POLICY1 = @@ -48,14 +48,10 @@ public final class ErasureCodingPolicyManager { private static final ErasureCodingPolicy SYS_POLICY3 = new ErasureCodingPolicy(ErasureCodeConstants.RS_6_3_LEGACY_SCHEMA, DEFAULT_CELLSIZE, HdfsConstants.RS_6_3_LEGACY_POLICY_ID); - private static final ErasureCodingPolicy SYS_POLICY4 = - new ErasureCodingPolicy(ErasureCodeConstants.XOR_2_1_SCHEMA, - DEFAULT_CELLSIZE, HdfsConstants.XOR_2_1_POLICY_ID); //We may add more later. private static final ErasureCodingPolicy[] SYS_POLICIES = - new ErasureCodingPolicy[]{SYS_POLICY1, SYS_POLICY2, SYS_POLICY3, - SYS_POLICY4}; + new ErasureCodingPolicy[]{SYS_POLICY1, SYS_POLICY2, SYS_POLICY3}; // Supported storage policies for striped EC files private static final byte[] SUITABLE_STORAGE_POLICIES_FOR_EC_STRIPED_MODE = new byte[] { @@ -101,19 +97,6 @@ public final class ErasureCodingPolicyManager { } /** - * Get system-wide policy by policy ID. - * @return ecPolicy - */ - public static ErasureCodingPolicy getPolicyByPolicyID(byte id) { - for (ErasureCodingPolicy policy : SYS_POLICIES) { - if (policy.getId() == id) { - return policy; - } - } - return null; - } - - /** * Get all policies that's available to use. * @return all policies */ @@ -158,7 +141,7 @@ public final class ErasureCodingPolicyManager { } /** - * Clear and clean up. + * Clear and clean up */ public void clear() { activePoliciesByName.clear(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfd8076f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index a5dcee9..37f97db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -455,13 +455,9 @@ public class INodeFile extends INodeWithAdditionalFields if(!isStriped()){ return max; } - + // TODO support more policies based on policyId ErasureCodingPolicy ecPolicy = - ErasureCodingPolicyManager.getPolicyByPolicyID( - getErasureCodingPolicyID()); - if (ecPolicy == null){ - ecPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy(); - } + ErasureCodingPolicyManager.getSystemDefaultPolicy(); return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfd8076f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 1fbc1d9..13e2656 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1888,41 +1888,21 @@ public class DFSTestUtil { * Creates the metadata of a file in striped layout. This method only * manipulates the NameNode state without injecting data to DataNode. * You should disable periodical heartbeat before use this. - * @param file Path of the file to create + * @param file Path of the file to create * @param dir Parent path of the file * @param numBlocks Number of striped block groups to add to the file * @param numStripesPerBlk Number of striped cells in each block * @param toMkdir */ - public static void createStripedFile(MiniDFSCluster cluster, Path file, - Path dir, int numBlocks, int numStripesPerBlk, boolean toMkdir) - throws Exception { - createStripedFile(cluster, file, dir, numBlocks, numStripesPerBlk, - toMkdir, null); - } - - /** - * Creates the metadata of a file in striped layout. This method only - * manipulates the NameNode state without injecting data to DataNode. - * You should disable periodical heartbeat before use this. - * @param file Path of the file to create - * @param dir Parent path of the file - * @param numBlocks Number of striped block groups to add to the file - * @param numStripesPerBlk Number of striped cells in each block - * @param toMkdir - * @param ecPolicy erasure coding policy apply to created file. A null value - * means using default erasure coding policy. - */ - public static void createStripedFile(MiniDFSCluster cluster, Path file, - Path dir, int numBlocks, int numStripesPerBlk, boolean toMkdir, - ErasureCodingPolicy ecPolicy) throws Exception { + public static void createStripedFile(MiniDFSCluster cluster, Path file, Path dir, + int numBlocks, int numStripesPerBlk, boolean toMkdir) throws Exception { DistributedFileSystem dfs = cluster.getFileSystem(); // If outer test already set EC policy, dir should be left as null if (toMkdir) { assert dir != null; dfs.mkdirs(dir); try { - dfs.getClient().setErasureCodingPolicy(dir.toString(), ecPolicy); + dfs.getClient().setErasureCodingPolicy(dir.toString(), null); } catch (IOException e) { if (!e.getMessage().contains("non-empty directory")) { throw e; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfd8076f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index 121b9a4..3b46c66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -64,34 +64,20 @@ public class TestDFSStripedInputStream { private DistributedFileSystem fs; private final Path dirPath = new Path("/striped"); private Path filePath = new Path(dirPath, "file"); - private ErasureCodingPolicy ecPolicy; - private short dataBlocks; - private short parityBlocks; - private int cellSize; + private final ErasureCodingPolicy ecPolicy = + ErasureCodingPolicyManager.getSystemDefaultPolicy(); + private final short dataBlocks = (short) ecPolicy.getNumDataUnits(); + private final short parityBlocks = (short) ecPolicy.getNumParityUnits(); + private final int cellSize = ecPolicy.getCellSize(); private final int stripesPerBlock = 2; - private int blockSize; - private int blockGroupSize; + private final int blockSize = stripesPerBlock * cellSize; + private final int blockGroupSize = dataBlocks * blockSize; @Rule public Timeout globalTimeout = new Timeout(300000); - public ErasureCodingPolicy getEcPolicy() { - return ErasureCodingPolicyManager.getSystemDefaultPolicy(); - } - @Before public void setup() throws IOException { - /* - * Initialize erasure coding policy. - */ - ecPolicy = getEcPolicy(); - dataBlocks = (short) ecPolicy.getNumDataUnits(); - parityBlocks = (short) ecPolicy.getNumParityUnits(); - cellSize = ecPolicy.getCellSize(); - blockSize = stripesPerBlock * cellSize; - blockGroupSize = dataBlocks * blockSize; - System.out.println("EC policy = " + ecPolicy); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); if (ErasureCodeNative.isNativeCodeLoaded()) { @@ -108,7 +94,7 @@ public class TestDFSStripedInputStream { } fs = cluster.getFileSystem(); fs.mkdirs(dirPath); - fs.getClient().setErasureCodingPolicy(dirPath.toString(), ecPolicy); + fs.getClient().setErasureCodingPolicy(dirPath.toString(), null); } @After @@ -120,13 +106,13 @@ public class TestDFSStripedInputStream { } /** - * Test {@link DFSStripedInputStream#getBlockAt(long)}. + * Test {@link DFSStripedInputStream#getBlockAt(long)} */ @Test public void testRefreshBlock() throws Exception { final int numBlocks = 4; DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, - stripesPerBlock, false, ecPolicy); + stripesPerBlock, false); LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( filePath.toString(), 0, blockGroupSize * numBlocks); final DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(), @@ -150,7 +136,7 @@ public class TestDFSStripedInputStream { public void testPread() throws Exception { final int numBlocks = 2; DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, - stripesPerBlock, false, ecPolicy); + stripesPerBlock, false); LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( filePath.toString(), 0, blockGroupSize * numBlocks); int fileLen = blockGroupSize * numBlocks; @@ -168,9 +154,7 @@ public class TestDFSStripedInputStream { bg.getBlock().getBlockPoolId()); } - /** - * A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks - */ + /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ for (int i = 0; i < stripesPerBlock; i++) { for (int j = 0; j < dataBlocks; j++) { for (int k = 0; k < cellSize; k++) { @@ -210,7 +194,7 @@ public class TestDFSStripedInputStream { final int numBlocks = 4; final int failedDNIdx = dataBlocks - 1; DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, - stripesPerBlock, false, ecPolicy); + stripesPerBlock, false); LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( filePath.toString(), 0, blockGroupSize); @@ -321,7 +305,7 @@ public class TestDFSStripedInputStream { setup(); } DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, - stripesPerBlock, false, ecPolicy); + stripesPerBlock, false); LocatedBlocks lbs = fs.getClient().namenode. getBlockLocations(filePath.toString(), 0, fileSize); @@ -346,9 +330,7 @@ public class TestDFSStripedInputStream { byte[] expected = new byte[fileSize]; for (LocatedBlock bg : lbs.getLocatedBlocks()) { - /** - * A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks - */ + /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */ for (int i = 0; i < stripesPerBlock; i++) { for (int j = 0; j < dataBlocks; j++) { for (int k = 0; k < cellSize; k++) { @@ -389,7 +371,7 @@ public class TestDFSStripedInputStream { final int numBlocks = 4; final int failedDNIdx = dataBlocks - 1; DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks, - stripesPerBlock, false, ecPolicy); + stripesPerBlock, false); LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations( filePath.toString(), 0, blockGroupSize); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfd8076f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index 5bde16e..b686f28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -47,36 +47,23 @@ public class TestDFSStripedOutputStream { GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL); } - private ErasureCodingPolicy ecPolicy; - private int dataBlocks; - private int parityBlocks; + private final ErasureCodingPolicy ecPolicy = + ErasureCodingPolicyManager.getSystemDefaultPolicy(); + private final int dataBlocks = ecPolicy.getNumDataUnits(); + private final int parityBlocks = ecPolicy.getNumParityUnits(); private MiniDFSCluster cluster; private DistributedFileSystem fs; private Configuration conf; - private int cellSize; + private final int cellSize = ecPolicy.getCellSize(); private final int stripesPerBlock = 4; - private int blockSize; + private final int blockSize = cellSize * stripesPerBlock; @Rule public Timeout globalTimeout = new Timeout(300000); - public ErasureCodingPolicy getEcPolicy() { - return ErasureCodingPolicyManager.getSystemDefaultPolicy(); - } - @Before public void setup() throws IOException { - /* - * Initialize erasure coding policy. - */ - ecPolicy = getEcPolicy(); - dataBlocks = (short) ecPolicy.getNumDataUnits(); - parityBlocks = (short) ecPolicy.getNumParityUnits(); - cellSize = ecPolicy.getCellSize(); - blockSize = stripesPerBlock * cellSize; - System.out.println("EC policy = " + ecPolicy); - int numDNs = dataBlocks + parityBlocks + 2; conf = new Configuration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); @@ -89,7 +76,7 @@ public class TestDFSStripedOutputStream { NativeRSRawErasureCoderFactory.class.getCanonicalName()); } cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); - cluster.getFileSystem().getClient().setErasureCodingPolicy("/", ecPolicy); + cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null); fs = cluster.getFileSystem(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfd8076f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index 0baf9cc..cde07a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -47,7 +47,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Assume; -import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -77,36 +76,18 @@ public class TestDFSStripedOutputStreamWithFailure { .getLogger().setLevel(Level.ALL); } - private ErasureCodingPolicy ecPolicy; - private int dataBlocks; - private int parityBlocks; - private int cellSize; + private final ErasureCodingPolicy ecPolicy = + ErasureCodingPolicyManager.getSystemDefaultPolicy(); + private final int dataBlocks = ecPolicy.getNumDataUnits(); + private final int parityBlocks = ecPolicy.getNumParityUnits(); + private final int cellSize = ecPolicy.getCellSize(); private final int stripesPerBlock = 4; - private int blockSize; - private int blockGroupSize; + private final int blockSize = cellSize * stripesPerBlock; + private final int blockGroupSize = blockSize * dataBlocks; private static final int FLUSH_POS = 9 * DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1; - public ErasureCodingPolicy getEcPolicy() { - return ErasureCodingPolicyManager.getSystemDefaultPolicy(); - } - - /* - * Initialize erasure coding policy. - */ - @Before - public void init(){ - ecPolicy = getEcPolicy(); - dataBlocks = ecPolicy.getNumDataUnits(); - parityBlocks = ecPolicy.getNumParityUnits(); - cellSize = ecPolicy.getCellSize(); - blockSize = cellSize * stripesPerBlock; - blockGroupSize = blockSize * dataBlocks; - dnIndexSuite = getDnIndexSuite(); - lengths = newLengths(); - } - List<Integer> newLengths() { final List<Integer> lens = new ArrayList<>(); lens.add(FLUSH_POS + 2); @@ -123,7 +104,7 @@ public class TestDFSStripedOutputStreamWithFailure { return lens; } - private int[][] dnIndexSuite; + private final int[][] dnIndexSuite = getDnIndexSuite(); private int[][] getDnIndexSuite() { final int maxNumLevel = 2; @@ -186,7 +167,7 @@ public class TestDFSStripedOutputStreamWithFailure { return positions; } - private List<Integer> lengths; + private final List<Integer> lengths = newLengths(); Integer getLength(int i) { return i >= 0 && i < lengths.size()? lengths.get(i): null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfd8076f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedInputStream.java deleted file mode 100644 index 75062e0..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedInputStream.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; - -/** - * This tests read operation of DFS striped file with XOR-2-1-64k erasure code - * policy. - */ -public class TestDFSXORStripedInputStream extends TestDFSStripedInputStream{ - - public ErasureCodingPolicy getEcPolicy() { - return ErasureCodingPolicyManager.getPolicyByPolicyID( - HdfsConstants.XOR_2_1_POLICY_ID); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfd8076f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStream.java deleted file mode 100644 index 64bddb8..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStream.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; - -/** - * This tests write operation of DFS striped file with XOR-2-1-64k erasure code - * policy. - */ -public class TestDFSXORStripedOutputStream extends TestDFSStripedOutputStream{ - - @Override - public ErasureCodingPolicy getEcPolicy() { - return ErasureCodingPolicyManager.getPolicyByPolicyID( - HdfsConstants.XOR_2_1_POLICY_ID); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfd8076f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStreamWithFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStreamWithFailure.java deleted file mode 100644 index ed361a8..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSXORStripedOutputStreamWithFailure.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; - -/** - * This tests write operation of DFS striped file with XOR-2-1-64k erasure code - * policy when there is data node failure. - */ -public class TestDFSXORStripedOutputStreamWithFailure - extends TestDFSStripedOutputStreamWithFailure{ - - @Override - public ErasureCodingPolicy getEcPolicy() { - return ErasureCodingPolicyManager.getPolicyByPolicyID( - HdfsConstants.XOR_2_1_POLICY_ID); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfd8076f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java new file mode 100644 index 0000000..38b2e78 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java @@ -0,0 +1,430 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestProportionalCapacityPreemptionPolicyForReservedContainers + extends ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + conf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, + true); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + @Test + public void testPreemptionForSimpleReservedContainer() throws IOException { + /** + * The simplest test of reserved container, Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * Guaranteed resource of a/b are 50:50 + * Total cluster resource = 100 + * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each + * container is 1. + * - B has am container at n1, and reserves 1 container with size = 9 at n1, + * so B needs to preempt 9 containers from A at n1 instead of randomly + * preempt from n1 and n2. + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=50;" + + "n2= res=50"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 9 9]);" + //root + "-a(=[50 100 90 0]);" + // a + "-b(=[50 100 10 9 9])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,,45,false)" // 45 in n1 + + "(1,1,n2,,45,false);" + // 45 in n2 + "b\t" // app2 in b + + "(1,1,n1,,1,false)" // AM container in n1 + + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Total 5 preempted from app1 at n1, don't preempt container from other + // app/node + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(5)).handle( + argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n1", 1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testUseReservedAndFifoSelectorTogether() throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * Guaranteed resource of a/b are 30:70 + * Total cluster resource = 100 + * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each + * container is 1. + * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1, + * B also has 20 pending resources. + * so B needs to preempt: + * - 10 containers from n1 (for reserved) + * - 5 containers from n2 for pending resources + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=50;" + + "n2= res=50"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 70 10]);" + //root + "-a(=[30 100 45 0]);" + // a + "-b(=[70 100 55 70 50])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n2,,35,false)" // 35 in n2 + + "(1,1,n1,,10,false);" + // 10 in n1 + "b\t" // app2 in b + + "(1,1,n2,,5,false)" // 5 in n2 + + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(10)).handle( + argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n1", 1)))); + verify(mDisp, times(5)).handle( + argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n2", 1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testReservedSelectorSkipsAMContainer() throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * Guaranteed resource of a/b are 30:70 + * Total cluster resource = 100 + * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each + * container is 1. + * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1, + * B also has 20 pending resources. + * + * Ideally B needs to preempt: + * - 10 containers from n1 (for reserved) + * - 5 containers from n2 for pending resources + * + * However, since one AM container is located at n1 (from queueA), we cannot + * preempt 10 containers from n1 for reserved container. Instead, we will + * preempt 15 containers from n2, since containers from queueA launched in n2 + * are later than containers from queueA launched in n1 (FIFO order of containers) + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=50;" + + "n2= res=50"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 70 10]);" + //root + "-a(=[30 100 45 0]);" + // a + "-b(=[70 100 55 70 50])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,,10,false)" // 10 in n1 + + "(1,1,n2,,35,false);" +// 35 in n2 + "b\t" // app2 in b + + "(1,1,n2,,5,false)" // 5 in n2 + + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle( + argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n1", 1)))); + verify(mDisp, times(15)).handle( + argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n2", 1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testPreemptionForReservedContainerRespectGuaranteedResource() + throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * Guaranteed resource of a/b are 85:15 + * Total cluster resource = 100 + * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each + * container is 1. + * - B has am container at n1, and reserves 1 container with size = 9 at n1, + * + * If we preempt 9 containers from queue-A, queue-A will be below its + * guaranteed resource = 90 - 9 = 81 < 85. + * + * So no preemption will take place + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=50;" + + "n2= res=50"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 9 9]);" + //root + "-a(=[85 100 90 0]);" + // a + "-b(=[15 100 10 9 9])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,,45,false)" // 45 in n1 + + "(1,1,n2,,45,false);" + // 45 in n2 + "b\t" // app2 in b + + "(1,1,n1,,1,false)" // AM container in n1 + + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testPreemptionForReservedContainerWhichHasAvailableResource() + throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * + * Guaranteed resource of a/b are 50:50 + * Total cluster resource = 100 + * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each + * container is 1. + * - B has am container at n1, and reserves 1 container with size = 9 at n1, + * + * So we can get 4 containers preempted after preemption. + * (reserved 5 + preempted 4) = 9 + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=50;" + + "n2= res=50"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 99 9 9]);" + //root + "-a(=[50 100 90 0]);" + // a + "-b(=[50 100 9 9 9])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,,45,false)" // 45 in n1 + + "(1,1,n2,,45,false);" + // 45 in n2 + "b\t" // app2 in b + + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Total 4 preempted from app1 at n1, don't preempt container from other + // app/node + verify(mDisp, times(4)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n1", 1)))); + verify(mDisp, times(0)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n2", 1)))); + } + + @Test + public void testPreemptionForReservedContainerWhichHasNondivisibleAvailableResource() + throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * + * Guaranteed resource of a/b are 50:50 + * Total cluster resource = 100 + * - A has 45 containers on two node, size of each container is 2, + * n1 has 23, n2 has 22 + * - B reserves 1 container with size = 9 at n1, + * + * So we can get 4 containers (total-resource = 8) preempted after + * preemption. Actual required is 3.5, but we need to preempt integer + * number of containers + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=50;" + + "n2= res=50"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 99 9 9]);" + //root + "-a(=[50 100 90 0]);" + // a + "-b(=[50 100 9 9 9])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,2,n1,,24,false)" // 48 in n1 + + "(1,2,n2,,23,false);" + // 46 in n2 + "b\t" // app2 in b + + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Total 4 preempted from app1 at n1, don't preempt container from other + // app/node + verify(mDisp, times(4)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n1", 1)))); + verify(mDisp, times(0)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n2", 1)))); + } + + @Test + public void testPreemptionForReservedContainerRespectAvailableResources() + throws IOException { + /** + * Queue structure is: + * + * <pre> + * root + * / \ + * a b + * </pre> + * + * Guaranteed resource of a/b are 50:50 + * Total cluster resource = 100, 4 nodes, 25 on each node + * - A has 10 containers on every node, size of container is 2 + * - B reserves 1 container with size = 9 at n1, + * + * So even if we cannot allocate container for B now, no preemption should + * happen since there're plenty of available resources. + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = + "n1= res=25;" + + "n2= res=25;" + + "n3= res=25;" + + "n4= res=25;"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 89 9 9]);" + //root + "-a(=[50 100 80 0]);" + // a + "-b(=[50 100 9 9 9])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,2,n1,,10,false)" // 10 in n1 + + "(1,2,n2,,10,false)" // 10 in n2 + + "(1,2,n3,,10,false)" // 10 in n3 + + "(1,2,n4,,10,false);" + // 10 in n4 + "b\t" // app2 in b + + "(1,9,n1,,1,true)"; // 1 container with size=5 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // No preemption should happen + verify(mDisp, times(0)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n1", 1)))); + verify(mDisp, times(0)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n2", 1)))); + verify(mDisp, times(0)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n3", 1)))); + verify(mDisp, times(0)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n4", 1)))); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org