This is an automated email from the ASF dual-hosted git repository.

tomscut pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new e16ae55833e HDFS-16568. dfsadmin -reconfig option to start/query 
reconfig on all live datanodes (#4264)
e16ae55833e is described below

commit e16ae55833e657087903281ea05636e1ffa2ec3e
Author: Viraj Jasani <[email protected]>
AuthorDate: Tue May 10 17:10:03 2022 -0700

    HDFS-16568. dfsadmin -reconfig option to start/query reconfig on all live 
datanodes (#4264)
    
    
    Signed-off-by: Tao Li <[email protected]>
---
 .../org/apache/hadoop/hdfs/tools/DFSAdmin.java     | 122 ++++++++++++++++++---
 .../hadoop-hdfs/src/site/markdown/HDFSCommands.md  |   4 +-
 .../hadoop-hdfs/src/site/markdown/HdfsUserGuide.md |   8 +-
 .../org/apache/hadoop/hdfs/tools/TestDFSAdmin.java |  62 +++++++++--
 4 files changed, 171 insertions(+), 25 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
index 38d6c1c3710..982bca19c58 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
@@ -36,7 +36,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
 
@@ -447,8 +450,7 @@ public class DFSAdmin extends FsShell {
     "\t[-refreshSuperUserGroupsConfiguration]\n" +
     "\t[-refreshCallQueue]\n" +
     "\t[-refresh <host:ipc_port> <key> [arg1..argn]\n" +
-    "\t[-reconfig <namenode|datanode> <host:ipc_port> " +
-      "<start|status|properties>]\n" +
+      "\t[-reconfig <namenode|datanode> <host:ipc_port|livenodes> 
<start|status|properties>]\n" +
     "\t[-printTopology]\n" +
       "\t[-refreshNamenodes datanode_host:ipc_port]\n" +
       "\t[-getVolumeReport datanode_host:ipc_port]\n" +
@@ -1199,12 +1201,14 @@ public class DFSAdmin extends FsShell {
 
     String refreshCallQueue = "-refreshCallQueue: Reload the call queue from 
config\n";
 
-    String reconfig = "-reconfig <namenode|datanode> <host:ipc_port> " +
+    String reconfig = "-reconfig <namenode|datanode> <host:ipc_port|livenodes> 
" +
         "<start|status|properties>:\n" +
         "\tStarts or gets the status of a reconfiguration operation, \n" +
         "\tor gets a list of reconfigurable properties.\n" +
-
-        "\tThe second parameter specifies the node type\n";
+        "\tThe second parameter specifies the node type\n" +
+        "\tThe third parameter specifies host address. For start or status, 
\n" +
+        "\tdatanode supports livenodes as third parameter, which will start 
\n" +
+        "\tor retrieve reconfiguration on all live datanodes.";
     String genericRefresh = "-refresh: Arguments are <hostname:ipc_port>" +
             " <resource_identifier> [arg1..argn]\n" +
             "\tTriggers a runtime-refresh of the resource specified by " +
@@ -1844,15 +1848,15 @@ public class DFSAdmin extends FsShell {
     return 0;
   }
 
-  public int reconfig(String[] argv, int i) throws IOException {
+  public int reconfig(String[] argv, int i) throws IOException, 
InterruptedException {
     String nodeType = argv[i];
     String address = argv[i + 1];
     String op = argv[i + 2];
 
     if ("start".equals(op)) {
-      return startReconfiguration(nodeType, address, System.out, System.err);
+      return startReconfigurationUtil(nodeType, address, System.out, 
System.err);
     } else if ("status".equals(op)) {
-      return getReconfigurationStatus(nodeType, address, System.out, 
System.err);
+      return getReconfigurationStatusUtil(nodeType, address, System.out, 
System.err);
     } else if ("properties".equals(op)) {
       return getReconfigurableProperties(nodeType, address, System.out,
           System.err);
@@ -1862,12 +1866,57 @@ public class DFSAdmin extends FsShell {
   }
 
   int startReconfiguration(final String nodeThpe, final String address)
-      throws IOException {
-    return startReconfiguration(nodeThpe, address, System.out, System.err);
+      throws IOException, InterruptedException {
+    return startReconfigurationUtil(nodeThpe, address, System.out, System.err);
+  }
+
+  int startReconfigurationUtil(final String nodeType, final String address, 
final PrintStream out,
+      final PrintStream err) throws IOException, InterruptedException {
+    if (!"livenodes".equals(address)) {
+      return startReconfiguration(nodeType, address, out, err);
+    }
+    if (!"datanode".equals(nodeType)) {
+      err.println("Only datanode type supports reconfiguration in bulk.");
+      return 1;
+    }
+    ExecutorService executorService = Executors.newFixedThreadPool(5);
+    DistributedFileSystem dfs = getDFS();
+    DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE);
+    AtomicInteger successCount = new AtomicInteger();
+    AtomicInteger failCount = new AtomicInteger();
+    if (nodes != null) {
+      for (DatanodeInfo node : nodes) {
+        executorService.submit(() -> {
+          int status = startReconfiguration(nodeType, node.getIpcAddr(false), 
out, err);
+          if (status == 0) {
+            successCount.incrementAndGet();
+          } else {
+            failCount.incrementAndGet();
+          }
+        });
+      }
+      while ((successCount.get() + failCount.get()) < nodes.length) {
+        Thread.sleep(1000);
+      }
+      executorService.shutdown();
+      if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
+        err.println("Executor service could not be terminated in 60s. Please 
wait for"
+            + " sometime before the system cools down.");
+      }
+      out.println("Starting of reconfiguration task successful on " + 
successCount.get()
+          + " nodes, failed on " + failCount.get() + " nodes.");
+      if (failCount.get() == 0) {
+        return 0;
+      } else {
+        return 1;
+      }
+    }
+    err.println("DFS datanode stats could not be retrieved.");
+    return 1;
   }
 
   int startReconfiguration(final String nodeType, final String address,
-      final PrintStream out, final PrintStream err) throws IOException {
+      final PrintStream out, final PrintStream err) {
     String outMsg = null;
     String errMsg = null;
     int ret = 0;
@@ -1908,8 +1957,53 @@ public class DFSAdmin extends FsShell {
     }
   }
 
-  int getReconfigurationStatus(final String nodeType, final String address,
-      final PrintStream out, final PrintStream err) throws IOException {
+  int getReconfigurationStatusUtil(final String nodeType, final String address,
+      final PrintStream out, final PrintStream err) throws IOException, 
InterruptedException {
+    if (!"livenodes".equals(address)) {
+      return getReconfigurationStatus(nodeType, address, out, err);
+    }
+    if (!"datanode".equals(nodeType)) {
+      err.println("Only datanode type supports reconfiguration in bulk.");
+      return 1;
+    }
+    ExecutorService executorService = Executors.newFixedThreadPool(5);
+    DistributedFileSystem dfs = getDFS();
+    DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE);
+    AtomicInteger successCount = new AtomicInteger();
+    AtomicInteger failCount = new AtomicInteger();
+    if (nodes != null) {
+      for (DatanodeInfo node : nodes) {
+        executorService.submit(() -> {
+          int status = getReconfigurationStatus(nodeType, 
node.getIpcAddr(false), out, err);
+          if (status == 0) {
+            successCount.incrementAndGet();
+          } else {
+            failCount.incrementAndGet();
+          }
+        });
+      }
+      while ((successCount.get() + failCount.get()) < nodes.length) {
+        Thread.sleep(1000);
+      }
+      executorService.shutdown();
+      if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
+        err.println("Executor service could not be terminated in 60s. Please 
wait for"
+            + " sometime before the system cools down.");
+      }
+      out.println("Retrieval of reconfiguration status successful on " + 
successCount.get()
+          + " nodes, failed on " + failCount.get() + " nodes.");
+      if (failCount.get() == 0) {
+        return 0;
+      } else {
+        return 1;
+      }
+    }
+    err.println("DFS datanode stats could not be retrieved.");
+    return 1;
+  }
+
+  int getReconfigurationStatus(final String nodeType, final String address, 
final PrintStream out,
+      final PrintStream err) {
     String outMsg = null;
     String errMsg = null;
     ReconfigurationTaskStatus status = null;
@@ -2152,7 +2246,7 @@ public class DFSAdmin extends FsShell {
                          + " [-refreshCallQueue]");
     } else if ("-reconfig".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
-          + " [-reconfig <namenode|datanode> <host:ipc_port> "
+          + " [-reconfig <namenode|datanode> <host:ipc_port|livenodes> "
           + "<start|status|properties>]");
     } else if ("-refresh".equals(cmd)) {
       System.err.println("Usage: hdfs dfsadmin"
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md 
b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index b21a1f14e27..090d03a6d8f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -363,7 +363,7 @@ Usage:
         hdfs dfsadmin [-refreshSuperUserGroupsConfiguration]
         hdfs dfsadmin [-refreshCallQueue]
         hdfs dfsadmin [-refresh <host:ipc_port> <key> [arg1..argn]]
-        hdfs dfsadmin [-reconfig <namenode|datanode> <host:ipc_port> <start 
|status |properties>]
+        hdfs dfsadmin [-reconfig <namenode|datanode> <host:ipc_port|livenodes> 
<start |status |properties>]
         hdfs dfsadmin [-printTopology]
         hdfs dfsadmin [-refreshNamenodes datanodehost:port]
         hdfs dfsadmin [-getVolumeReport datanodehost:port]
@@ -401,7 +401,7 @@ Usage:
 | `-refreshSuperUserGroupsConfiguration` | Refresh superuser proxy groups 
mappings |
 | `-refreshCallQueue` | Reload the call queue from config. |
 | `-refresh` \<host:ipc\_port\> \<key\> [arg1..argn] | Triggers a 
runtime-refresh of the resource specified by \<key\> on \<host:ipc\_port\>. All 
other args after are sent to the host. |
-| `-reconfig` \<datanode \|namenode\> \<host:ipc\_port\> 
\<start\|status\|properties\> | Starts reconfiguration or gets the status of an 
ongoing reconfiguration, or gets a list of reconfigurable properties. The 
second parameter specifies the node type. |
+| `-reconfig` \<datanode \|namenode\> \<host:ipc\_port\|livenodes> 
\<start\|status\|properties\> | Starts reconfiguration or gets the status of an 
ongoing reconfiguration, or gets a list of reconfigurable properties. The 
second parameter specifies the node type. The third parameter specifies host 
address. For start or status, datanode supports livenodes as third parameter, 
which will start or retrieve reconfiguration on all live datanodes. |
 | `-printTopology` | Print a tree of the racks and their nodes as reported by 
the Namenode |
 | `-refreshNamenodes` datanodehost:port | For the given datanode, reloads the 
configuration files, stops serving the removed block-pools and starts serving 
new block-pools. |
 | `-getVolumeReport` datanodehost:port | For the given datanode, get the 
volume report. |
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md 
b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md
index 54a8056068b..16970730c92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md
@@ -351,7 +351,13 @@ Datanode supports hot swappable drives. The user can add 
or replace HDFS data vo
 * The user runs `dfsadmin -reconfig datanode HOST:PORT start` to start
   the reconfiguration process. The user can use
   `dfsadmin -reconfig datanode HOST:PORT status`
-  to query the running status of the reconfiguration task.
+  to query the running status of the reconfiguration task. In place of
+  HOST:PORT, we can also specify livenodes for datanode. It would allow
+  start or query reconfiguration on all live datanodes, whereas specifying
+  HOST:PORT would only allow start or query of reconfiguration on the
+  particular datanode represented by HOST:PORT. The examples for livenodes
+  queries are `dfsadmin -reconfig datanode livenodes start` and
+  `dfsadmin -reconfig datanode livenodes status`.
 
 * Once the reconfiguration task has completed, the user can safely `umount`
   the removed data volume directories and physically remove the disks.
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
index f8d72ca1fdb..525503b621c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -177,20 +177,20 @@ public class TestDFSAdmin {
   }
 
   private void getReconfigurableProperties(String nodeType, String address,
-      final List<String> outs, final List<String> errs) throws IOException {
+      final List<String> outs, final List<String> errs) throws IOException, 
InterruptedException {
     reconfigurationOutErrFormatter("getReconfigurableProperties", nodeType,
         address, outs, errs);
   }
 
   private void getReconfigurationStatus(String nodeType, String address,
-      final List<String> outs, final List<String> errs) throws IOException {
+      final List<String> outs, final List<String> errs) throws IOException, 
InterruptedException {
     reconfigurationOutErrFormatter("getReconfigurationStatus", nodeType,
         address, outs, errs);
   }
 
   private void reconfigurationOutErrFormatter(String methodName,
       String nodeType, String address, final List<String> outs,
-      final List<String> errs) throws IOException {
+      final List<String> errs) throws IOException, InterruptedException {
     ByteArrayOutputStream bufOut = new ByteArrayOutputStream();
     PrintStream outStream = new PrintStream(bufOut);
     ByteArrayOutputStream bufErr = new ByteArrayOutputStream();
@@ -203,9 +203,9 @@ public class TestDFSAdmin {
           outStream,
           errStream);
     } else if (methodName.equals("getReconfigurationStatus")) {
-      admin.getReconfigurationStatus(nodeType, address, outStream, errStream);
+      admin.getReconfigurationStatusUtil(nodeType, address, outStream, 
errStream);
     } else if (methodName.equals("startReconfiguration")) {
-      admin.startReconfiguration(nodeType, address, outStream, errStream);
+      admin.startReconfigurationUtil(nodeType, address, outStream, errStream);
     }
 
     scanIntoList(bufOut, outs);
@@ -326,7 +326,7 @@ public class TestDFSAdmin {
   }
 
   @Test(timeout = 30000)
-  public void testDataNodeGetReconfigurableProperties() throws IOException {
+  public void testDataNodeGetReconfigurableProperties() throws IOException, 
InterruptedException {
     final int port = datanode.getIpcPort();
     final String address = "localhost:" + port;
     final List<String> outs = Lists.newArrayList();
@@ -422,7 +422,7 @@ public class TestDFSAdmin {
   }
 
   @Test(timeout = 30000)
-  public void testNameNodeGetReconfigurableProperties() throws IOException {
+  public void testNameNodeGetReconfigurableProperties() throws IOException, 
InterruptedException {
     final String address = namenode.getHostAndPort();
     final List<String> outs = Lists.newArrayList();
     final List<String> errs = Lists.newArrayList();
@@ -452,7 +452,7 @@ public class TestDFSAdmin {
         errs.clear();
         try {
           getReconfigurationStatus(nodeType, address, outs, errs);
-        } catch (IOException e) {
+        } catch (IOException | InterruptedException e) {
           LOG.error(String.format(
               "call getReconfigurationStatus on %s[%s] failed.", nodeType,
               address), e);
@@ -1062,4 +1062,50 @@ public class TestDFSAdmin {
       }
     });
   }
+
+  @Test
+  public void testAllDatanodesReconfig()
+      throws IOException, InterruptedException, TimeoutException {
+    ReconfigurationUtil reconfigurationUtil = mock(ReconfigurationUtil.class);
+    cluster.getDataNodes().get(0).setReconfigurationUtil(reconfigurationUtil);
+    cluster.getDataNodes().get(1).setReconfigurationUtil(reconfigurationUtil);
+
+    List<ReconfigurationUtil.PropertyChange> changes = new ArrayList<>();
+    changes.add(new ReconfigurationUtil.PropertyChange(
+        DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true",
+        datanode.getConf().get(DFS_DATANODE_PEER_STATS_ENABLED_KEY)));
+    when(reconfigurationUtil.parseChangedProperties(any(Configuration.class),
+        any(Configuration.class))).thenReturn(changes);
+
+    assertEquals(0, admin.startReconfiguration("datanode", "livenodes"));
+    final List<String> outsForStartReconf = new ArrayList<>();
+    final List<String> errsForStartReconf = new ArrayList<>();
+    reconfigurationOutErrFormatter("startReconfiguration", "datanode",
+        "livenodes", outsForStartReconf, errsForStartReconf);
+    assertEquals(3, outsForStartReconf.size());
+    assertEquals(0, errsForStartReconf.size());
+    assertTrue(outsForStartReconf.get(0).startsWith("Started reconfiguration 
task on node"));
+    assertTrue(outsForStartReconf.get(1).startsWith("Started reconfiguration 
task on node"));
+    assertEquals("Starting of reconfiguration task successful on 2 nodes, 
failed on 0 nodes.",
+        outsForStartReconf.get(2));
+
+    Thread.sleep(1000);
+    final List<String> outs = new ArrayList<>();
+    final List<String> errs = new ArrayList<>();
+    awaitReconfigurationFinished("datanode", "livenodes", outs, errs);
+    assertEquals(9, outs.size());
+    assertEquals(0, errs.size());
+    LOG.info("dfsadmin -status -livenodes output:");
+    outs.forEach(s -> LOG.info("{}", s));
+    assertTrue(outs.get(0).startsWith("Reconfiguring status for node"));
+    assertEquals("SUCCESS: Changed property dfs.datanode.peer.stats.enabled", 
outs.get(2));
+    assertEquals("\tFrom: \"false\"", outs.get(3));
+    assertEquals("\tTo: \"true\"", outs.get(4));
+    assertEquals("SUCCESS: Changed property dfs.datanode.peer.stats.enabled", 
outs.get(5));
+    assertEquals("\tFrom: \"false\"", outs.get(6));
+    assertEquals("\tTo: \"true\"", outs.get(7));
+    assertEquals("Retrieval of reconfiguration status successful on 2 nodes, 
failed on 0 nodes.",
+        outs.get(8));
+  }
+
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to