This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 0ef20990cce6bf6a28932018f62a17995f041706
Author: Duo Zhang <zhang...@apache.org>
AuthorDate: Tue Jan 15 11:43:41 2019 +0800

    HBASE-21719 Rewrite RegionPlacementMaintainer to use AsyncClusterConnection
    
    Signed-off-by: Michael Stack <st...@apache.org>
---
 .../hbase/master/RegionPlacementMaintainer.java    | 225 +++++++++++----------
 1 file changed, 113 insertions(+), 112 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java
index faf5e4a..fda0a9c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionPlacementMaintainer.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.master;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
@@ -39,29 +38,30 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper;
 import org.apache.hadoop.hbase.favored.FavoredNodesPlan;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.MunkresAssignment;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
 
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
 
@@ -71,7 +71,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavor
  */
 @InterfaceAudience.Private
 // TODO: Remove? Unused. Partially implemented only.
-public class RegionPlacementMaintainer {
+public class RegionPlacementMaintainer implements Closeable {
   private static final Logger LOG = 
LoggerFactory.getLogger(RegionPlacementMaintainer.class
       .getName());
   //The cost of a placement that should never be assigned.
@@ -96,9 +96,9 @@ public class RegionPlacementMaintainer {
   private final boolean enforceMinAssignmentMove;
   private RackManager rackManager;
   private Set<TableName> targetTableSet;
-  private final Connection connection;
+  private AsyncClusterConnection connection;
 
-  public RegionPlacementMaintainer(Configuration conf) {
+  public RegionPlacementMaintainer(Configuration conf) throws IOException {
     this(conf, true, true);
   }
 
@@ -109,11 +109,6 @@ public class RegionPlacementMaintainer {
     this.enforceMinAssignmentMove = enforceMinAssignmentMove;
     this.targetTableSet = new HashSet<>();
     this.rackManager = new RackManager(conf);
-    try {
-      this.connection = ConnectionFactory.createConnection(this.conf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
   }
 
   private static void printHelp(Options opt) {
@@ -124,6 +119,14 @@ public class RegionPlacementMaintainer {
         " [-fs hdfs://a.b.c.d:9000] [-hbase_root /HBASE]", opt);
   }
 
+  private AsyncClusterConnection getConnection() throws IOException {
+    if (connection == null) {
+      connection =
+        ClusterConnectionFactory.createAsyncClusterConnection(this.conf, null, 
User.getCurrent());
+    }
+    return connection;
+  }
+
   public void setTargetTableName(String[] tableNames) {
     if (tableNames != null) {
       for (String table : tableNames)
@@ -133,10 +136,8 @@ public class RegionPlacementMaintainer {
 
   /**
    * @return the new RegionAssignmentSnapshot
-   * @throws IOException
    */
-  public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot()
-  throws IOException {
+  public SnapshotOfRegionAssignmentFromMeta getRegionAssignmentSnapshot() 
throws IOException {
     SnapshotOfRegionAssignmentFromMeta currentAssignmentShapshot =
       new 
SnapshotOfRegionAssignmentFromMeta(ConnectionFactory.createConnection(conf));
     currentAssignmentShapshot.initialize();
@@ -145,9 +146,6 @@ public class RegionPlacementMaintainer {
 
   /**
    * Verify the region placement is consistent with the assignment plan
-   * @param isDetailMode
-   * @return reports
-   * @throws IOException
    */
   public List<AssignmentVerificationReport> verifyRegionPlacement(boolean 
isDetailMode)
       throws IOException {
@@ -206,10 +204,9 @@ public class RegionPlacementMaintainer {
 
     // Get the all the region servers
     List<ServerName> servers = new ArrayList<>();
-    try (Admin admin = this.connection.getAdmin()) {
-      servers.addAll(admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
+    servers.addAll(
+      
FutureUtils.get(getConnection().getAdmin().getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)))
         .getLiveServerMetrics().keySet());
-    }
 
     LOG.info("Start to generate assignment plan for " + numRegions +
         " regions from table " + tableName + " with " +
@@ -492,6 +489,11 @@ public class RegionPlacementMaintainer {
     return plan;
   }
 
+  @Override
+  public void close() throws IOException {
+    Closeables.close(connection, true);
+  }
+
   /**
    * Some algorithms for solving the assignment problem may traverse workers or
    * jobs in linear order which may result in skewing the assignments of the
@@ -690,19 +692,17 @@ public class RegionPlacementMaintainer {
         }
         if (singleServerPlan != null) {
           // Update the current region server with its updated favored nodes
-          BlockingInterface currentRegionServer =
-            ((ClusterConnection)this.connection).getAdmin(entry.getKey());
+          AsyncRegionServerAdmin rsAdmin = 
getConnection().getRegionServerAdmin(entry.getKey());
           UpdateFavoredNodesRequest request =
-              
RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
-
+            RequestConverter.buildUpdateFavoredNodesRequest(regionUpdateInfos);
           UpdateFavoredNodesResponse updateFavoredNodesResponse =
-              currentRegionServer.updateFavoredNodes(null, request);
+            FutureUtils.get(rsAdmin.updateFavoredNodes(request));
           LOG.info("Region server " +
-              ProtobufUtil.getServerInfo(null, 
currentRegionServer).getServerName() +
-              " has updated " + updateFavoredNodesResponse.getResponse() + " / 
" +
-              singleServerPlan.getAssignmentMap().size() +
-              " regions with the assignment plan");
-          succeededNum ++;
+            
FutureUtils.get(rsAdmin.getServerInfo(RequestConverter.buildGetServerInfoRequest()))
+              .getServerInfo() +
+            " has updated " + updateFavoredNodesResponse.getResponse() + " / " 
+
+            singleServerPlan.getAssignmentMap().size() + " regions with the 
assignment plan");
+          succeededNum++;
         }
       } catch (Exception e) {
         failedUpdateMap.put(entry.getKey(), e);
@@ -719,7 +719,7 @@ public class RegionPlacementMaintainer {
           " region servers with its corresponding favored nodes");
       for (Map.Entry<ServerName, Exception> entry :
         failedUpdateMap.entrySet() ) {
-        LOG.error("Failed to update " + entry.getKey().getHostAndPort() +
+        LOG.error("Failed to update " + entry.getKey().getAddress() +
             " because of " + entry.getValue().getMessage());
       }
     }
@@ -1019,93 +1019,94 @@ public class RegionPlacementMaintainer {
       }
 
       // Create the region placement obj
-      RegionPlacementMaintainer rp = new RegionPlacementMaintainer(conf, 
enforceLocality,
-          enforceMinAssignmentMove);
+      try (RegionPlacementMaintainer rp =
+        new RegionPlacementMaintainer(conf, enforceLocality, 
enforceMinAssignmentMove)) {
 
-      if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
-        verificationDetails = true;
-      }
-
-      if (cmd.hasOption("tables")) {
-        String tableNameListStr = cmd.getOptionValue("tables");
-        String[] tableNames = StringUtils.split(tableNameListStr, ",");
-        rp.setTargetTableName(tableNames);
-      }
+        if (cmd.hasOption("d") || cmd.hasOption("verification-details")) {
+          verificationDetails = true;
+        }
 
-      if (cmd.hasOption("munkres")) {
-        USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
-      }
+        if (cmd.hasOption("tables")) {
+          String tableNameListStr = cmd.getOptionValue("tables");
+          String[] tableNames = StringUtils.split(tableNameListStr, ",");
+          rp.setTargetTableName(tableNames);
+        }
 
-      // Read all the modes
-      if (cmd.hasOption("v") || cmd.hasOption("verify")) {
-        // Verify the region placement.
-        rp.verifyRegionPlacement(verificationDetails);
-      } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
-        // Generate the assignment plan only without updating the hbase:meta 
and RS
-        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
-        printAssignmentPlan(plan);
-      } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
-        // Generate the new assignment plan
-        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
-        // Print the new assignment plan
-        printAssignmentPlan(plan);
-        // Write the new assignment plan to META
-        rp.updateAssignmentPlanToMeta(plan);
-      } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
-        // Generate the new assignment plan
-        FavoredNodesPlan plan = rp.getNewAssignmentPlan();
-        // Print the new assignment plan
-        printAssignmentPlan(plan);
-        // Update the assignment to hbase:meta and Region Servers
-        rp.updateAssignmentPlan(plan);
-      } else if (cmd.hasOption("diff")) {
-        FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
-        Map<String, Map<String, Float>> locality = FSUtils
-            .getRegionDegreeLocalityMappingFromFS(conf);
-        Map<TableName, Integer> movesPerTable = rp.getRegionsMovement(newPlan);
-        rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
-        System.out.println("Do you want to update the assignment plan? [y/n]");
-        Scanner s = new Scanner(System.in);
-        String input = s.nextLine().trim();
-        if (input.equals("y")) {
-          System.out.println("Updating assignment plan...");
-          rp.updateAssignmentPlan(newPlan);
+        if (cmd.hasOption("munkres")) {
+          USE_MUNKRES_FOR_PLACING_SECONDARY_AND_TERTIARY = true;
         }
-        s.close();
-      } else if (cmd.hasOption("ld")) {
-        Map<String, Map<String, Float>> locality = FSUtils
-            .getRegionDegreeLocalityMappingFromFS(conf);
-        rp.printLocalityAndDispersionForCurrentPlan(locality);
-      } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
-        FavoredNodesPlan plan = 
rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
-        printAssignmentPlan(plan);
-      } else if (cmd.hasOption("overwrite")) {
-        if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
-          throw new IllegalArgumentException("Please specify: " +
+
+        // Read all the modes
+        if (cmd.hasOption("v") || cmd.hasOption("verify")) {
+          // Verify the region placement.
+          rp.verifyRegionPlacement(verificationDetails);
+        } else if (cmd.hasOption("n") || cmd.hasOption("dry-run")) {
+          // Generate the assignment plan only without updating the hbase:meta 
and RS
+          FavoredNodesPlan plan = rp.getNewAssignmentPlan();
+          printAssignmentPlan(plan);
+        } else if (cmd.hasOption("w") || cmd.hasOption("write")) {
+          // Generate the new assignment plan
+          FavoredNodesPlan plan = rp.getNewAssignmentPlan();
+          // Print the new assignment plan
+          printAssignmentPlan(plan);
+          // Write the new assignment plan to META
+          rp.updateAssignmentPlanToMeta(plan);
+        } else if (cmd.hasOption("u") || cmd.hasOption("update")) {
+          // Generate the new assignment plan
+          FavoredNodesPlan plan = rp.getNewAssignmentPlan();
+          // Print the new assignment plan
+          printAssignmentPlan(plan);
+          // Update the assignment to hbase:meta and Region Servers
+          rp.updateAssignmentPlan(plan);
+        } else if (cmd.hasOption("diff")) {
+          FavoredNodesPlan newPlan = rp.getNewAssignmentPlan();
+          Map<String, Map<String, Float>> locality =
+            FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
+          Map<TableName, Integer> movesPerTable = 
rp.getRegionsMovement(newPlan);
+          rp.checkDifferencesWithOldPlan(movesPerTable, locality, newPlan);
+          System.out.println("Do you want to update the assignment plan? 
[y/n]");
+          Scanner s = new Scanner(System.in);
+          String input = s.nextLine().trim();
+          if (input.equals("y")) {
+            System.out.println("Updating assignment plan...");
+            rp.updateAssignmentPlan(newPlan);
+          }
+          s.close();
+        } else if (cmd.hasOption("ld")) {
+          Map<String, Map<String, Float>> locality =
+            FSUtils.getRegionDegreeLocalityMappingFromFS(conf);
+          rp.printLocalityAndDispersionForCurrentPlan(locality);
+        } else if (cmd.hasOption("p") || cmd.hasOption("print")) {
+          FavoredNodesPlan plan = 
rp.getRegionAssignmentSnapshot().getExistingAssignmentPlan();
+          printAssignmentPlan(plan);
+        } else if (cmd.hasOption("overwrite")) {
+          if (!cmd.hasOption("f") || !cmd.hasOption("r")) {
+            throw new IllegalArgumentException("Please specify: " +
               " -update -r regionName -f 
server1:port,server2:port,server3:port");
-        }
+          }
 
-        String regionName = cmd.getOptionValue("r");
-        String favoredNodesStr = cmd.getOptionValue("f");
-        LOG.info("Going to update the region " + regionName + " with the new 
favored nodes " +
+          String regionName = cmd.getOptionValue("r");
+          String favoredNodesStr = cmd.getOptionValue("f");
+          LOG.info("Going to update the region " + regionName + " with the new 
favored nodes " +
             favoredNodesStr);
-        List<ServerName> favoredNodes = null;
-        RegionInfo regionInfo =
+          List<ServerName> favoredNodes = null;
+          RegionInfo regionInfo =
             
rp.getRegionAssignmentSnapshot().getRegionNameToRegionInfoMap().get(regionName);
-        if (regionInfo == null) {
-          LOG.error("Cannot find the region " + regionName + " from the META");
-        } else {
-          try {
-            favoredNodes = getFavoredNodeList(favoredNodesStr);
-          } catch (IllegalArgumentException e) {
-            LOG.error("Cannot parse the invalid favored nodes because " + e);
+          if (regionInfo == null) {
+            LOG.error("Cannot find the region " + regionName + " from the 
META");
+          } else {
+            try {
+              favoredNodes = getFavoredNodeList(favoredNodesStr);
+            } catch (IllegalArgumentException e) {
+              LOG.error("Cannot parse the invalid favored nodes because " + e);
+            }
+            FavoredNodesPlan newPlan = new FavoredNodesPlan();
+            newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
+            rp.updateAssignmentPlan(newPlan);
           }
-          FavoredNodesPlan newPlan = new FavoredNodesPlan();
-          newPlan.updateFavoredNodesMap(regionInfo, favoredNodes);
-          rp.updateAssignmentPlan(newPlan);
+        } else {
+          printHelp(opt);
         }
-      } else {
-        printHelp(opt);
       }
     } catch (ParseException e) {
       printHelp(opt);

Reply via email to