This is an automated email from the ASF dual-hosted git repository.

niuyulin pushed a commit to branch HBASE-25714
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-25714 by this push:
     new 68800b5  HBASE-26076 Support favoredNodes when do compaction offload 
(#3468)
68800b5 is described below

commit 68800b52409e12a4a9c3d01917d809423e2869cd
Author: niuyulin <[email protected]>
AuthorDate: Mon Aug 16 18:41:45 2021 +0800

    HBASE-26076 Support favoredNodes when do compaction offload (#3468)
    
    Signed-off-by: Duo Zhang <[email protected]>
---
 .../hbase/compactionserver/CSRpcServices.java      |  11 +--
 .../compactionserver/CompactionThreadManager.java  |  49 +++++----
 .../hadoop/hbase/regionserver/HRegionServer.java   |  12 ++-
 .../apache/hadoop/hbase/regionserver/HStore.java   |  22 ++++-
 .../TestRegionFavoredNodesWhenCompactOffload.java  |  96 ++++++++++++++++++
 .../hbase/regionserver/TestRegionFavoredNodes.java | 109 ++++++++++++---------
 6 files changed, 214 insertions(+), 85 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
index 25cfa1a..7faa665 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.compactionserver;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -42,7 +41,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 
 @InterfaceAudience.Private
 public class CSRpcServices extends AbstractRpcServices
@@ -96,12 +94,11 @@ public class CSRpcServices extends AbstractRpcServices
     ColumnFamilyDescriptor cfd = 
ProtobufUtil.toColumnFamilyDescriptor(request.getFamily());
     boolean major = request.getMajor();
     int priority = request.getPriority();
-    List<HBaseProtos.ServerName> favoredNodes = 
Collections.singletonList(request.getServer());
     LOG.info("Receive compaction request from {}", 
ProtobufUtil.toString(request));
-    CompactionTask compactionTask =
-        
CompactionTask.newBuilder().setRsServerName(rsServerName).setRegionInfo(regionInfo)
-            
.setColumnFamilyDescriptor(cfd).setRequestMajor(major).setPriority(priority)
-            
.setFavoredNodes(favoredNodes).setSubmitTime(System.currentTimeMillis()).build();
+    CompactionTask compactionTask = 
CompactionTask.newBuilder().setRsServerName(rsServerName)
+        
.setRegionInfo(regionInfo).setColumnFamilyDescriptor(cfd).setRequestMajor(major)
+        .setPriority(priority).setFavoredNodes(request.getFavoredNodesList())
+        .setSubmitTime(System.currentTimeMillis()).build();
     try {
       
compactionServer.compactionThreadManager.requestCompaction(compactionTask);
       return CompactionProtos.CompactResponse.newBuilder().build();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
index 2eb697d..f5fcfe2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
@@ -150,6 +150,27 @@ public class CompactionThreadManager implements 
ThroughputControllerService {
     }
   }
 
+  /**
+   * Open store, and clean stale compacted file in cache
+   */
+  private HStore openStore(RegionInfo regionInfo, ColumnFamilyDescriptor cfd, 
boolean major,
+      MonitoredTask status) throws IOException {
+    status.setStatus("Open store");
+    HStore store = getStore(conf, server.getFileSystem(), rootDir,
+      tableDescriptors.get(regionInfo.getTable()), regionInfo, 
cfd.getNameAsString());
+    // handle TTL case
+    store.removeUnneededFiles(false);
+    // CompactedHFilesDischarger only run on regionserver, so compactionserver 
does not have
+    // opportunity to clean compacted file at that time, we clean compacted 
files here
+    compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
+      store.getStorefiles().stream().map(sf -> 
sf.getPath().getName()).collect(Collectors.toSet()));
+    if (major) {
+      status.setStatus("Trigger major compaction");
+      store.triggerMajorCompaction();
+    }
+    return store;
+  }
+
   private void selectFileAndExecuteTask(CompactionTask compactionTask) throws 
IOException {
     ServerName rsServerName = compactionTask.getRsServerName();
     RegionInfo regionInfo = compactionTask.getRegionInfo();
@@ -169,10 +190,10 @@ public class CompactionThreadManager implements 
ThroughputControllerService {
     // the three has consistent state, we need this condition to guarantee 
correct selection
     synchronized 
(compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) {
       synchronized 
(compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) {
-        Pair<HStore, Optional<CompactionContext>> pair = 
selectCompaction(regionInfo, cfd,
+        store = openStore(regionInfo, cfd, compactionTask.isRequestMajor(), 
status);
+        
store.assignFavoredNodesForCompactionOffload(compactionTask.getFavoredNodes());
+        Optional<CompactionContext> compaction = selectCompaction(store, 
regionInfo, cfd,
           compactionTask.isRequestMajor(), compactionTask.getPriority(), 
status, logStr);
-        store = pair.getFirst();
-        Optional<CompactionContext> compaction = pair.getSecond();
         if (!compaction.isPresent()) {
           store.close();
           LOG.info("Compaction context is empty: {}", compactionTask);
@@ -204,26 +225,12 @@ public class CompactionThreadManager implements 
ThroughputControllerService {
   }
 
   /**
-   * Open store, and select compaction context
-   * @return Store and CompactionContext
+   * select compaction context
+   * @return CompactionContext
    */
-  Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo 
regionInfo,
+  Optional<CompactionContext> selectCompaction(HStore store, RegionInfo 
regionInfo,
       ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask 
status, String logStr)
       throws IOException {
-    status.setStatus("Open store");
-    tableDescriptors.get(regionInfo.getTable());
-    HStore store = getStore(conf, server.getFileSystem(), rootDir,
-      tableDescriptors.get(regionInfo.getTable()), regionInfo, 
cfd.getNameAsString());
-    // handle TTL case
-    store.removeUnneededFiles(false);
-    // CompactedHFilesDischarger only run on regionserver, so compactionserver 
does not have
-    // opportunity to clean compacted file at that time, we clean compacted 
files here
-    compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
-      store.getStorefiles().stream().map(sf -> 
sf.getPath().getName()).collect(Collectors.toSet()));
-    if (major) {
-      status.setStatus("Trigger major compaction");
-      store.triggerMajorCompaction();
-    }
     // get current compacting and compacted files, NOTE: these files are file 
names only, don't
     // include paths.
     status.setStatus("Get current compacting and compacted files from 
compactionFilesCache");
@@ -243,7 +250,7 @@ public class CompactionThreadManager implements 
ThroughputControllerService {
       CompactionLifeCycleTracker.DUMMY, null, excludeStoreFiles);
     LOG.info("After select store: {}, if compaction context is present: {}", 
logStr,
       compaction.isPresent());
-    return new Pair<>(store, compaction);
+    return compaction;
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index b2df9b7..f5e82df 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -3758,16 +3758,18 @@ public class HRegionServer extends AbstractServer 
implements
     }
     CompactionService.BlockingInterface cms = cmsStub;
     InetSocketAddress[] favoredNodesForRegion =
-      getFavoredNodesForRegion(regionInfo.getEncodedName());
-    CompactRequest.Builder builder =
-      
CompactRequest.newBuilder().setServer(ProtobufUtil.toServerName(getServerName()))
+        getFavoredNodesForRegion(regionInfo.getEncodedName());
+    CompactRequest.Builder builder = CompactRequest.newBuilder()
+        .setServer(ProtobufUtil.toServerName(getServerName()))
         .setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo))
         
.setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setMajor(major).setPriority(priority);
-    if (favoredNodesForRegion != null) {
+    if (favoredNodesForRegion != null && favoredNodesForRegion.length > 0) {
       for (InetSocketAddress address : favoredNodesForRegion) {
         builder.addFavoredNodes(ProtobufUtil
-          .toServerName(ServerName.valueOf(address.getHostName(), 
address.getPort(), 0L)));
+            .toServerName(ServerName.valueOf(address.getHostName(), 
address.getPort(), 0L)));
       }
+    } else {
+      builder.addFavoredNodes(ProtobufUtil.toServerName(getServerName()));
     }
     CompactRequest compactRequest = builder.build();
     try {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 0569ad3..cacea25 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -118,6 +118,7 @@ import 
org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
 import 
org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
 
 /**
@@ -344,14 +345,29 @@ public class HStore implements Store, HeapSize, 
StoreConfigInformation,
   }
 
   private InetSocketAddress[] getFavoredNodes() {
-    InetSocketAddress[] favoredNodes = null;
     if (region.getRegionServerServices() != null) {
-      favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
-          region.getRegionInfo().getEncodedName());
+      return region.getRegionServerServices()
+          .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName());
     }
     return favoredNodes;
   }
 
+  // Favored nodes used by compaction offload
+  private InetSocketAddress[] favoredNodes = null;
+
+  // This method is not thread safe.
+  // We initialize a new store everytime for a compaction request when 
compaction offload.
+  // So the method is only called once after initializeStoreContext and before 
real do compaction.
+  public void 
assignFavoredNodesForCompactionOffload(List<HBaseProtos.ServerName> 
favoredNodes) {
+    if (CollectionUtils.isNotEmpty(favoredNodes)) {
+      this.favoredNodes = new InetSocketAddress[favoredNodes.size()];
+      for (int i = 0; i < favoredNodes.size(); i++) {
+        this.favoredNodes[i] = 
InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
+          favoredNodes.get(i).getPort());
+      }
+    }
+  }
+
   /**
    * @return MemStore Instance to use in this store.
    */
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionFavoredNodesWhenCompactOffload.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionFavoredNodesWhenCompactOffload.java
new file mode 100644
index 0000000..d531c05
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionFavoredNodesWhenCompactOffload.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.TestRegionFavoredNodes;
+import org.apache.hadoop.hbase.testclassification.CompactionServerTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ CompactionServerTests.class, MediumTests.class })
+public class TestRegionFavoredNodesWhenCompactOffload extends 
TestRegionFavoredNodes {
+  private static HCompactionServer COMPACTION_SERVER;
+  private static final int FLUSHES = 10;
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestRegionFavoredNodesWhenCompactOffload.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    try {
+      checkFileSystemWithFavoredNode();
+    } catch (NoSuchMethodException nm) {
+      return;
+    }
+    TableDescriptor tableDescriptor =
+        
TableDescriptorBuilder.newBuilder(TABLE_NAME).setCompactionOffloadEnabled(true).build();
+    
TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numCompactionServers(1)
+      .numDataNodes(REGION_SERVERS).numRegionServers(REGION_SERVERS).build());
+    TEST_UTIL.getAdmin().switchCompactionOffload(true);
+    TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
+    table = TEST_UTIL.createTable(tableDescriptor, 
Bytes.toByteArrays(COLUMN_FAMILY),
+      HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE, 
TEST_UTIL.getConfiguration());
+    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
+    COMPACTION_SERVER = 
TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
+      .getCompactionServer();
+  }
+
+  @Test
+  public void testFavoredNodes() throws Exception {
+    Assume.assumeTrue(createWithFavoredNode != null);
+    InetSocketAddress[] nodes = getDataNodes();
+    String[] nodeNames = new String[REGION_SERVERS];
+    for (int i = 0; i < REGION_SERVERS; i++) {
+      nodeNames[i] = nodes[i].getAddress().getHostAddress() + ":" + 
nodes[i].getPort();
+    }
+    updateFavoredNodes(nodes);
+    // Write some data to each region and flush. Repeat some number of times to
+    // get multiple files for each region.
+    for (int i = 0; i < FLUSHES; i++) {
+      TEST_UTIL.loadTable(table, COLUMN_FAMILY, false);
+      TEST_UTIL.flush();
+    }
+    TEST_UTIL.compact(TABLE_NAME, true);
+    TEST_UTIL.waitFor(60000, () -> {
+      int hFileCount = 0;
+      for (HRegion region : 
TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
+        hFileCount += region.getStore(COLUMN_FAMILY).getStorefilesCount();
+
+      }
+      return hFileCount == 
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE.length + 1;
+    });
+    checkFavoredNodes(nodeNames);
+    // To ensure do compaction on compaction server
+    TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
+  }
+
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java
index 64d3cb9..2493be4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java
@@ -55,22 +55,27 @@ public class TestRegionFavoredNodes {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestRegionFavoredNodes.class);
 
-  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
-  private static Table table;
-  private static final TableName TABLE_NAME =
+  protected static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  protected static Table table;
+  protected static final TableName TABLE_NAME =
       TableName.valueOf("table");
-  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("family");
+  protected static final byte[] COLUMN_FAMILY = Bytes.toBytes("family");
   private static final int FAVORED_NODES_NUM = 3;
-  private static final int REGION_SERVERS = 6;
+  protected static final int REGION_SERVERS = 6;
   private static final int FLUSHES = 3;
-  private static Method createWithFavoredNode = null;
+  protected static Method createWithFavoredNode = null;
+
+  protected static void checkFileSystemWithFavoredNode() throws Exception {
+    createWithFavoredNode = 
DistributedFileSystem.class.getDeclaredMethod("create", Path.class,
+      FsPermission.class, boolean.class, int.class, short.class, long.class, 
Progressable.class,
+      InetSocketAddress[].class);
+
+  }
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     try {
-      createWithFavoredNode = 
DistributedFileSystem.class.getDeclaredMethod("create", Path.class,
-        FsPermission.class, boolean.class, int.class, short.class, long.class,
-        Progressable.class, InetSocketAddress[].class);
+      checkFileSystemWithFavoredNode();
     } catch (NoSuchMethodException nm) {
       return;
     }
@@ -91,9 +96,36 @@ public class TestRegionFavoredNodes {
     TEST_UTIL.shutdownMiniCluster();
   }
 
-  @Test
-  public void testFavoredNodes() throws Exception {
-    Assume.assumeTrue(createWithFavoredNode != null);
+  protected void checkFavoredNodes(String[] nodeNames) throws Exception {
+    // For each region, check the block locations of each file and ensure that
+    // they are consistent with the favored nodes for that region.
+    for (int i = 0; i < REGION_SERVERS; i++) {
+      HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i);
+      List<HRegion> regions = server.getRegions(TABLE_NAME);
+      for (HRegion region : regions) {
+        List<String> files = region.getStoreFileList(new byte[][] { 
COLUMN_FAMILY });
+        for (String file : files) {
+          FileStatus status = TEST_UTIL.getDFSCluster().getFileSystem()
+              .getFileStatus(new Path(new URI(file).getPath()));
+          BlockLocation[] lbks = ((DistributedFileSystem) 
TEST_UTIL.getDFSCluster().getFileSystem())
+              .getFileBlockLocations(status, 0, Long.MAX_VALUE);
+          for (BlockLocation lbk : lbks) {
+            locations: for (String info : lbk.getNames()) {
+              for (int j = 0; j < FAVORED_NODES_NUM; j++) {
+                if (info.equals(nodeNames[(i + j) % REGION_SERVERS])) {
+                  continue locations;
+                }
+              }
+              // This block was at a location that was not a favored location.
+              fail("Block location " + info + " not a favored node");
+            }
+          }
+        }
+      }
+    }
+  }
+
+  protected InetSocketAddress[] getDataNodes() throws Exception{
     // Get the addresses of the datanodes in the cluster.
     InetSocketAddress[] nodes = new InetSocketAddress[REGION_SERVERS];
     List<DataNode> datanodes = TEST_UTIL.getDFSCluster().getDataNodes();
@@ -106,13 +138,10 @@ public class TestRegionFavoredNodes {
     for (int i = 0; i < REGION_SERVERS; i++) {
       nodes[i] = (InetSocketAddress)selfAddress.invoke(datanodes.get(i));
     }
+    return nodes;
+  }
 
-    String[] nodeNames = new String[REGION_SERVERS];
-    for (int i = 0; i < REGION_SERVERS; i++) {
-      nodeNames[i] = nodes[i].getAddress().getHostAddress() + ":" +
-          nodes[i].getPort();
-    }
-
+  protected void updateFavoredNodes(InetSocketAddress[] nodes){
     // For each region, choose some datanodes as the favored nodes then assign
     // them as favored nodes through the region.
     for (int i = 0; i < REGION_SERVERS; i++) {
@@ -120,11 +149,11 @@ public class TestRegionFavoredNodes {
       List<HRegion> regions = server.getRegions(TABLE_NAME);
       for (HRegion region : regions) {
         
List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName>favoredNodes
 =
-            new ArrayList<>(3);
+          new ArrayList<>(3);
         String encodedRegionName = region.getRegionInfo().getEncodedName();
         for (int j = 0; j < FAVORED_NODES_NUM; j++) {
           
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName.Builder
 b =
-              
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName.newBuilder();
+            
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName.newBuilder();
           b.setHostName(nodes[(i + j) % 
REGION_SERVERS].getAddress().getHostAddress());
           b.setPort(nodes[(i + j) % REGION_SERVERS].getPort());
           b.setStartCode(-1);
@@ -133,41 +162,23 @@ public class TestRegionFavoredNodes {
         server.updateRegionFavoredNodesMapping(encodedRegionName, 
favoredNodes);
       }
     }
+  }
 
+  @Test
+  public void testFavoredNodes() throws Exception {
+    Assume.assumeTrue(createWithFavoredNode != null);
+    InetSocketAddress[] nodes = getDataNodes();
+    String[] nodeNames = new String[REGION_SERVERS];
+    for (int i = 0; i < REGION_SERVERS; i++) {
+      nodeNames[i] = nodes[i].getAddress().getHostAddress() + ":" + 
nodes[i].getPort();
+    }
+    updateFavoredNodes(nodes);
     // Write some data to each region and flush. Repeat some number of times to
     // get multiple files for each region.
     for (int i = 0; i < FLUSHES; i++) {
       TEST_UTIL.loadTable(table, COLUMN_FAMILY, false);
       TEST_UTIL.flush();
     }
-
-    // For each region, check the block locations of each file and ensure that
-    // they are consistent with the favored nodes for that region.
-    for (int i = 0; i < REGION_SERVERS; i++) {
-      HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i);
-      List<HRegion> regions = server.getRegions(TABLE_NAME);
-      for (HRegion region : regions) {
-        List<String> files = region.getStoreFileList(new 
byte[][]{COLUMN_FAMILY});
-        for (String file : files) {
-          FileStatus status = TEST_UTIL.getDFSCluster().getFileSystem().
-              getFileStatus(new Path(new URI(file).getPath()));
-          BlockLocation[] lbks =
-              
((DistributedFileSystem)TEST_UTIL.getDFSCluster().getFileSystem())
-              .getFileBlockLocations(status, 0, Long.MAX_VALUE);
-          for (BlockLocation lbk : lbks) {
-            locations:
-              for (String info : lbk.getNames()) {
-                for (int j = 0; j < FAVORED_NODES_NUM; j++) {
-                  if (info.equals(nodeNames[(i + j) % REGION_SERVERS])) {
-                    continue locations;
-                  }
-                }
-                // This block was at a location that was not a favored 
location.
-                fail("Block location " + info + " not a favored node");
-              }
-          }
-        }
-      }
-    }
+    checkFavoredNodes(nodeNames);
   }
 }

Reply via email to