This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2096304 HDFS-15120. Refresh BlockPlacementPolicy at runtime.
Contributed by Jinglun.
2096304 is described below
commit 209630472a3216c9f13bcffa62b553f9fb7675ca
Author: Ayush Saxena <[email protected]>
AuthorDate: Thu Feb 27 02:22:24 2020 +0530
HDFS-15120. Refresh BlockPlacementPolicy at runtime. Contributed by Jinglun.
---
.../hdfs/server/blockmanagement/BlockManager.java | 10 +-
.../hadoop/hdfs/server/namenode/NameNode.java | 15 ++-
.../namenode/TestRefreshBlockPlacementPolicy.java | 131 +++++++++++++++++++++
.../org/apache/hadoop/hdfs/tools/TestDFSAdmin.java | 10 +-
4 files changed, 161 insertions(+), 5 deletions(-)
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index cb031a9..605f502 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -438,7 +438,7 @@ public class BlockManager implements BlockStatsMXBean {
private double reconstructionQueuesInitProgress = 0.0;
/** for block replicas placement */
- private BlockPlacementPolicies placementPolicies;
+ private volatile BlockPlacementPolicies placementPolicies;
private final BlockStoragePolicySuite storagePolicySuite;
/** Check whether name system is running before terminating */
@@ -775,6 +775,14 @@ public class BlockManager implements BlockStatsMXBean {
return placementPolicies.getPolicy(CONTIGUOUS);
}
+ public void refreshBlockPlacementPolicy(Configuration conf) {
+ BlockPlacementPolicies bpp =
+ new BlockPlacementPolicies(conf, datanodeManager.getFSClusterStats(),
+ datanodeManager.getNetworkTopology(),
+ datanodeManager.getHost2DatanodeMap());
+ placementPolicies = bpp;
+ }
+
/** Dump meta data to out. */
public void metaSave(PrintWriter out) {
assert namesystem.hasReadLock(); // TODO: block manager read lock and NS
write lock
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 2a74190..74757e5 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -184,6 +184,8 @@ import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS_DEFAULT;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_ENABLE;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
@@ -322,7 +324,9 @@ public class NameNode extends ReconfigurableBase implements
DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
- DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION));
+ DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
+ DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+ DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY));
private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
@@ -2179,6 +2183,10 @@ public class NameNode extends ReconfigurableBase
implements
|| property.equals(
DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
return reconfReplicationParameters(newVal, property);
+ } else if (property.equals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY) || property
+ .equals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY)) {
+ reconfBlockPlacementPolicy();
+ return newVal;
} else {
throw new ReconfigurationException(property, newVal, getConf().get(
property));
@@ -2223,6 +2231,11 @@ public class NameNode extends ReconfigurableBase
implements
}
}
+ private void reconfBlockPlacementPolicy() {
+ getNamesystem().getBlockManager()
+ .refreshBlockPlacementPolicy(getNewConf());
+ }
+
private int adjustNewVal(int defaultVal, String newVal) {
if (newVal == null) {
return defaultVal;
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshBlockPlacementPolicy.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshBlockPlacementPolicy.java
new file mode 100644
index 0000000..b431db7
--- /dev/null
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshBlockPlacementPolicy.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.AddBlockFlag;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import
org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.net.Node;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test refresh block placement policy.
+ */
+public class TestRefreshBlockPlacementPolicy {
+ private MiniDFSCluster cluster;
+ private Configuration config;
+ private static int counter = 0;
+ static class MockBlockPlacementPolicy extends BlockPlacementPolicyDefault {
+ @Override
+ public DatanodeStorageInfo[] chooseTarget(String srcPath,
+ int numOfReplicas,
+ Node writer,
+ List<DatanodeStorageInfo> chosen,
+ boolean returnChosenNodes,
+ Set<Node> excludedNodes,
+ long blocksize,
+ BlockStoragePolicy storagePolicy,
+ EnumSet<AddBlockFlag> flags) {
+ counter++;
+ return super.chooseTarget(srcPath, numOfReplicas, writer, chosen,
+ returnChosenNodes, excludedNodes, blocksize, storagePolicy, flags);
+ }
+ }
+
+ @Before
+ public void setup() throws IOException {
+ config = new Configuration();
+ config.setClass(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+ MockBlockPlacementPolicy.class, BlockPlacementPolicy.class);
+ config.setClass(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
+ MockBlockPlacementPolicy.class, BlockPlacementPolicy.class);
+ cluster = new MiniDFSCluster.Builder(config).numDataNodes(9).build();
+ cluster.waitActive();
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testRefreshReplicationPolicy() throws Exception {
+ Path file = new Path("/test-file");
+ DistributedFileSystem dfs = cluster.getFileSystem();
+
+ verifyRefreshPolicy(dfs, file, () -> cluster.getNameNode()
+ .reconfigurePropertyImpl(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, null));
+ }
+
+ @Test
+ public void testRefreshEcPolicy() throws Exception {
+ Path ecDir = new Path("/ec");
+ Path file = new Path("/ec/test-file");
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ dfs.mkdir(ecDir, FsPermission.createImmutable((short)755));
+ dfs.setErasureCodingPolicy(ecDir, null);
+
+ verifyRefreshPolicy(dfs, file, () -> cluster.getNameNode()
+ .reconfigurePropertyImpl(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, null));
+ }
+
+ @FunctionalInterface
+ private interface Refresh {
+ void refresh() throws ReconfigurationException;
+ }
+
+ private void verifyRefreshPolicy(DistributedFileSystem dfs, Path file,
+ Refresh func) throws IOException, ReconfigurationException {
+ // Choose datanode using the mock policy.
+ int lastCounter = counter;
+ OutputStream out = dfs.create(file, true);
+ out.write("test".getBytes());
+ out.close();
+ assert(counter > lastCounter);
+
+ // Refresh to the default policy.
+ func.refresh();
+
+ lastCounter = counter;
+ dfs.delete(file, true);
+ out = dfs.create(file, true);
+ out.write("test".getBytes());
+ out.close();
+ assertEquals(lastCounter, counter);
+ }
+}
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
index fb3b584..366e07c 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -22,6 +22,8 @@ import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
@@ -419,9 +421,11 @@ public class TestDFSAdmin {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
- assertEquals(10, outs.size());
- assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(1));
- assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(2));
+ assertEquals(12, outs.size());
+ assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(1));
+ assertEquals(DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, outs.get(2));
+ assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(3));
+ assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(4));
assertEquals(errs.size(), 0);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]