huangzhaobo99 commented on code in PR #6504:
URL: https://github.com/apache/hadoop/pull/6504#discussion_r1468455612
##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java:
##########
@@ -1283,4 +1285,90 @@ public void testAllDatanodesReconfig()
Assertions.assertThat(outs.subList(1, 5)).containsSubsequence(success,
from, to);
Assertions.assertThat(outs.subList(5, 9)).containsSubsequence(success,
from, to, retrieval);
}
+
+ @Test
+ public void testDecommissionDataNodesReconfig()
+ throws IOException, InterruptedException, TimeoutException {
+ redirectStream();
+
+ ReconfigurationUtil reconfigurationUtil = mock(ReconfigurationUtil.class);
+ cluster.getDataNodes().get(0).setReconfigurationUtil(reconfigurationUtil);
+ cluster.getDataNodes().get(1).setReconfigurationUtil(reconfigurationUtil);
+ List<ReconfigurationUtil.PropertyChange> changes = new ArrayList<>();
+ changes.add(new ReconfigurationUtil.PropertyChange(
+ DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY
+ , "1000",
+
datanode.getConf().get(DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY)));
+ when(reconfigurationUtil.parseChangedProperties(any(Configuration.class),
+ any(Configuration.class))).thenReturn(changes);
+
+ DFSAdmin dfsAdmin = Mockito.spy(new DFSAdmin(conf));
+ DistributedFileSystem dfs = Mockito.spy(cluster.getFileSystem());
+ DatanodeInfo decommissionNode = dfs.getDataNodeStats()[0];
+ DatanodeInfo[] dataNodeStats = new DatanodeInfo[]{decommissionNode};
+ when(dfsAdmin.getDFS()).thenReturn(dfs);
+
when(dfs.getDataNodeStats(DatanodeReportType.DECOMMISSIONING)).thenReturn(dataNodeStats);
+
+ int ret = dfsAdmin.startReconfiguration("datanode", "decomnodes");
+
+ // collect outputs
+ final List<String> outsForStartReconf = Lists.newArrayList();
+ final List<String> errsForStartReconf = Lists.newArrayList();
+ scanIntoList(out, outsForStartReconf);
+ scanIntoList(err, errsForStartReconf);
+
+ // verify startReconfiguration results is as expected
+ assertEquals(0, ret);
+ String started = "Started reconfiguration task on node";
+ String starting =
+ "Starting of reconfiguration task successful on 1 nodes, failed on 0
nodes.";
Review Comment:
Can test multiple nodes here?
##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java:
##########
@@ -1944,23 +1946,28 @@ int startReconfigurationUtil(final String nodeType,
final String address, final
}
ExecutorService executorService = Executors.newFixedThreadPool(5);
DistributedFileSystem dfs = getDFS();
- DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE);
+ final DatanodeInfo[] nodes = "livenodes".equals(address) ?
+ dfs.getDataNodeStats(DatanodeReportType.LIVE) :
+ dfs.getDataNodeStats(DatanodeReportType.DECOMMISSIONING);
AtomicInteger successCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
if (nodes != null) {
+ final CountDownLatch latch = new CountDownLatch(nodes.length);
for (DatanodeInfo node : nodes) {
executorService.submit(() -> {
- int status = startReconfiguration(nodeType, node.getIpcAddr(false),
out, err);
- if (status == 0) {
- successCount.incrementAndGet();
- } else {
- failCount.incrementAndGet();
+ try {
+ int status = startReconfiguration(nodeType,
node.getIpcAddr(false), out, err);
+ if (status == 0) {
+ successCount.incrementAndGet();
+ } else {
+ failCount.incrementAndGet();
+ }
+ }finally {
Review Comment:
Hi, This needs to be formatted, missing a space.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]