This is an automated email from the ASF dual-hosted git repository.
niuyulin pushed a commit to branch HBASE-25714
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-25714 by this push:
new 68800b5 HBASE-26076 Support favoredNodes when do compaction offload
(#3468)
68800b5 is described below
commit 68800b52409e12a4a9c3d01917d809423e2869cd
Author: niuyulin <[email protected]>
AuthorDate: Mon Aug 16 18:41:45 2021 +0800
HBASE-26076 Support favoredNodes when do compaction offload (#3468)
Signed-off-by: Duo Zhang <[email protected]>
---
.../hbase/compactionserver/CSRpcServices.java | 11 +--
.../compactionserver/CompactionThreadManager.java | 49 +++++----
.../hadoop/hbase/regionserver/HRegionServer.java | 12 ++-
.../apache/hadoop/hbase/regionserver/HStore.java | 22 ++++-
.../TestRegionFavoredNodesWhenCompactOffload.java | 96 ++++++++++++++++++
.../hbase/regionserver/TestRegionFavoredNodes.java | 109 ++++++++++++---------
6 files changed, 214 insertions(+), 85 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
index 25cfa1a..7faa665 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CSRpcServices.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.compactionserver;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -42,7 +41,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
@InterfaceAudience.Private
public class CSRpcServices extends AbstractRpcServices
@@ -96,12 +94,11 @@ public class CSRpcServices extends AbstractRpcServices
ColumnFamilyDescriptor cfd =
ProtobufUtil.toColumnFamilyDescriptor(request.getFamily());
boolean major = request.getMajor();
int priority = request.getPriority();
- List<HBaseProtos.ServerName> favoredNodes =
Collections.singletonList(request.getServer());
LOG.info("Receive compaction request from {}",
ProtobufUtil.toString(request));
- CompactionTask compactionTask =
-
CompactionTask.newBuilder().setRsServerName(rsServerName).setRegionInfo(regionInfo)
-
.setColumnFamilyDescriptor(cfd).setRequestMajor(major).setPriority(priority)
-
.setFavoredNodes(favoredNodes).setSubmitTime(System.currentTimeMillis()).build();
+ CompactionTask compactionTask =
CompactionTask.newBuilder().setRsServerName(rsServerName)
+
.setRegionInfo(regionInfo).setColumnFamilyDescriptor(cfd).setRequestMajor(major)
+ .setPriority(priority).setFavoredNodes(request.getFavoredNodesList())
+ .setSubmitTime(System.currentTimeMillis()).build();
try {
compactionServer.compactionThreadManager.requestCompaction(compactionTask);
return CompactionProtos.CompactResponse.newBuilder().build();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
index 2eb697d..f5fcfe2 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/compactionserver/CompactionThreadManager.java
@@ -150,6 +150,27 @@ public class CompactionThreadManager implements
ThroughputControllerService {
}
}
+ /**
+ * Open store, and clean stale compacted file in cache
+ */
+ private HStore openStore(RegionInfo regionInfo, ColumnFamilyDescriptor cfd,
boolean major,
+ MonitoredTask status) throws IOException {
+ status.setStatus("Open store");
+ HStore store = getStore(conf, server.getFileSystem(), rootDir,
+ tableDescriptors.get(regionInfo.getTable()), regionInfo,
cfd.getNameAsString());
+ // handle TTL case
+ store.removeUnneededFiles(false);
+ // CompactedHFilesDischarger only run on regionserver, so compactionserver
does not have
+ // opportunity to clean compacted file at that time, we clean compacted
files here
+ compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
+ store.getStorefiles().stream().map(sf ->
sf.getPath().getName()).collect(Collectors.toSet()));
+ if (major) {
+ status.setStatus("Trigger major compaction");
+ store.triggerMajorCompaction();
+ }
+ return store;
+ }
+
private void selectFileAndExecuteTask(CompactionTask compactionTask) throws
IOException {
ServerName rsServerName = compactionTask.getRsServerName();
RegionInfo regionInfo = compactionTask.getRegionInfo();
@@ -169,10 +190,10 @@ public class CompactionThreadManager implements
ThroughputControllerService {
// the three has consistent state, we need this condition to guarantee
correct selection
synchronized
(compactionFilesCache.getCompactedStoreFilesAsLock(regionInfo, cfd)) {
synchronized
(compactionFilesCache.getSelectedStoreFilesAsLock(regionInfo, cfd)) {
- Pair<HStore, Optional<CompactionContext>> pair =
selectCompaction(regionInfo, cfd,
+ store = openStore(regionInfo, cfd, compactionTask.isRequestMajor(),
status);
+
store.assignFavoredNodesForCompactionOffload(compactionTask.getFavoredNodes());
+ Optional<CompactionContext> compaction = selectCompaction(store,
regionInfo, cfd,
compactionTask.isRequestMajor(), compactionTask.getPriority(),
status, logStr);
- store = pair.getFirst();
- Optional<CompactionContext> compaction = pair.getSecond();
if (!compaction.isPresent()) {
store.close();
LOG.info("Compaction context is empty: {}", compactionTask);
@@ -204,26 +225,12 @@ public class CompactionThreadManager implements
ThroughputControllerService {
}
/**
- * Open store, and select compaction context
- * @return Store and CompactionContext
+ * select compaction context
+ * @return CompactionContext
*/
- Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo
regionInfo,
+ Optional<CompactionContext> selectCompaction(HStore store, RegionInfo
regionInfo,
ColumnFamilyDescriptor cfd, boolean major, int priority, MonitoredTask
status, String logStr)
throws IOException {
- status.setStatus("Open store");
- tableDescriptors.get(regionInfo.getTable());
- HStore store = getStore(conf, server.getFileSystem(), rootDir,
- tableDescriptors.get(regionInfo.getTable()), regionInfo,
cfd.getNameAsString());
- // handle TTL case
- store.removeUnneededFiles(false);
- // CompactedHFilesDischarger only run on regionserver, so compactionserver
does not have
- // opportunity to clean compacted file at that time, we clean compacted
files here
- compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
- store.getStorefiles().stream().map(sf ->
sf.getPath().getName()).collect(Collectors.toSet()));
- if (major) {
- status.setStatus("Trigger major compaction");
- store.triggerMajorCompaction();
- }
// get current compacting and compacted files, NOTE: these files are file
names only, don't
// include paths.
status.setStatus("Get current compacting and compacted files from
compactionFilesCache");
@@ -243,7 +250,7 @@ public class CompactionThreadManager implements
ThroughputControllerService {
CompactionLifeCycleTracker.DUMMY, null, excludeStoreFiles);
LOG.info("After select store: {}, if compaction context is present: {}",
logStr,
compaction.isPresent());
- return new Pair<>(store, compaction);
+ return compaction;
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index b2df9b7..f5e82df 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -3758,16 +3758,18 @@ public class HRegionServer extends AbstractServer
implements
}
CompactionService.BlockingInterface cms = cmsStub;
InetSocketAddress[] favoredNodesForRegion =
- getFavoredNodesForRegion(regionInfo.getEncodedName());
- CompactRequest.Builder builder =
-
CompactRequest.newBuilder().setServer(ProtobufUtil.toServerName(getServerName()))
+ getFavoredNodesForRegion(regionInfo.getEncodedName());
+ CompactRequest.Builder builder = CompactRequest.newBuilder()
+ .setServer(ProtobufUtil.toServerName(getServerName()))
.setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo))
.setFamily(ProtobufUtil.toColumnFamilySchema(cfd)).setMajor(major).setPriority(priority);
- if (favoredNodesForRegion != null) {
+ if (favoredNodesForRegion != null && favoredNodesForRegion.length > 0) {
for (InetSocketAddress address : favoredNodesForRegion) {
builder.addFavoredNodes(ProtobufUtil
- .toServerName(ServerName.valueOf(address.getHostName(),
address.getPort(), 0L)));
+ .toServerName(ServerName.valueOf(address.getHostName(),
address.getPort(), 0L)));
}
+ } else {
+ builder.addFavoredNodes(ProtobufUtil.toServerName(getServerName()));
}
CompactRequest compactRequest = builder.build();
try {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 0569ad3..cacea25 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -118,6 +118,7 @@ import
org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUti
import
org.apache.hbase.thirdparty.org.apache.commons.collections4.IterableUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
/**
@@ -344,14 +345,29 @@ public class HStore implements Store, HeapSize,
StoreConfigInformation,
}
private InetSocketAddress[] getFavoredNodes() {
- InetSocketAddress[] favoredNodes = null;
if (region.getRegionServerServices() != null) {
- favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
- region.getRegionInfo().getEncodedName());
+ return region.getRegionServerServices()
+ .getFavoredNodesForRegion(region.getRegionInfo().getEncodedName());
}
return favoredNodes;
}
+ // Favored nodes used by compaction offload
+ private InetSocketAddress[] favoredNodes = null;
+
+ // This method is not thread safe.
+ // We initialize a new store everytime for a compaction request when
compaction offload.
+ // So the method is only called once after initializeStoreContext and before
real do compaction.
+ public void
assignFavoredNodesForCompactionOffload(List<HBaseProtos.ServerName>
favoredNodes) {
+ if (CollectionUtils.isNotEmpty(favoredNodes)) {
+ this.favoredNodes = new InetSocketAddress[favoredNodes.size()];
+ for (int i = 0; i < favoredNodes.size(); i++) {
+ this.favoredNodes[i] =
InetSocketAddress.createUnresolved(favoredNodes.get(i).getHostName(),
+ favoredNodes.get(i).getPort());
+ }
+ }
+ }
+
/**
* @return MemStore Instance to use in this store.
*/
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionFavoredNodesWhenCompactOffload.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionFavoredNodesWhenCompactOffload.java
new file mode 100644
index 0000000..d531c05
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/compactionserver/TestRegionFavoredNodesWhenCompactOffload.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.compactionserver;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.StartMiniClusterOption;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.TestRegionFavoredNodes;
+import org.apache.hadoop.hbase.testclassification.CompactionServerTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ CompactionServerTests.class, MediumTests.class })
+public class TestRegionFavoredNodesWhenCompactOffload extends
TestRegionFavoredNodes {
+ private static HCompactionServer COMPACTION_SERVER;
+ private static final int FLUSHES = 10;
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+
HBaseClassTestRule.forClass(TestRegionFavoredNodesWhenCompactOffload.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ try {
+ checkFileSystemWithFavoredNode();
+ } catch (NoSuchMethodException nm) {
+ return;
+ }
+ TableDescriptor tableDescriptor =
+
TableDescriptorBuilder.newBuilder(TABLE_NAME).setCompactionOffloadEnabled(true).build();
+
TEST_UTIL.startMiniCluster(StartMiniClusterOption.builder().numCompactionServers(1)
+ .numDataNodes(REGION_SERVERS).numRegionServers(REGION_SERVERS).build());
+ TEST_UTIL.getAdmin().switchCompactionOffload(true);
+ TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster();
+ table = TEST_UTIL.createTable(tableDescriptor,
Bytes.toByteArrays(COLUMN_FAMILY),
+ HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE,
TEST_UTIL.getConfiguration());
+ TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
+ COMPACTION_SERVER =
TEST_UTIL.getMiniHBaseCluster().getCompactionServerThreads().get(0)
+ .getCompactionServer();
+ }
+
+ @Test
+ public void testFavoredNodes() throws Exception {
+ Assume.assumeTrue(createWithFavoredNode != null);
+ InetSocketAddress[] nodes = getDataNodes();
+ String[] nodeNames = new String[REGION_SERVERS];
+ for (int i = 0; i < REGION_SERVERS; i++) {
+ nodeNames[i] = nodes[i].getAddress().getHostAddress() + ":" +
nodes[i].getPort();
+ }
+ updateFavoredNodes(nodes);
+ // Write some data to each region and flush. Repeat some number of times to
+ // get multiple files for each region.
+ for (int i = 0; i < FLUSHES; i++) {
+ TEST_UTIL.loadTable(table, COLUMN_FAMILY, false);
+ TEST_UTIL.flush();
+ }
+ TEST_UTIL.compact(TABLE_NAME, true);
+ TEST_UTIL.waitFor(60000, () -> {
+ int hFileCount = 0;
+ for (HRegion region :
TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
+ hFileCount += region.getStore(COLUMN_FAMILY).getStorefilesCount();
+
+ }
+ return hFileCount ==
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE.length + 1;
+ });
+ checkFavoredNodes(nodeNames);
+ // To ensure do compaction on compaction server
+ TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
+ }
+
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java
index 64d3cb9..2493be4 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java
@@ -55,22 +55,27 @@ public class TestRegionFavoredNodes {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionFavoredNodes.class);
- private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
- private static Table table;
- private static final TableName TABLE_NAME =
+ protected static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+ protected static Table table;
+ protected static final TableName TABLE_NAME =
TableName.valueOf("table");
- private static final byte[] COLUMN_FAMILY = Bytes.toBytes("family");
+ protected static final byte[] COLUMN_FAMILY = Bytes.toBytes("family");
private static final int FAVORED_NODES_NUM = 3;
- private static final int REGION_SERVERS = 6;
+ protected static final int REGION_SERVERS = 6;
private static final int FLUSHES = 3;
- private static Method createWithFavoredNode = null;
+ protected static Method createWithFavoredNode = null;
+
+ protected static void checkFileSystemWithFavoredNode() throws Exception {
+ createWithFavoredNode =
DistributedFileSystem.class.getDeclaredMethod("create", Path.class,
+ FsPermission.class, boolean.class, int.class, short.class, long.class,
Progressable.class,
+ InetSocketAddress[].class);
+
+ }
@BeforeClass
public static void setUpBeforeClass() throws Exception {
try {
- createWithFavoredNode =
DistributedFileSystem.class.getDeclaredMethod("create", Path.class,
- FsPermission.class, boolean.class, int.class, short.class, long.class,
- Progressable.class, InetSocketAddress[].class);
+ checkFileSystemWithFavoredNode();
} catch (NoSuchMethodException nm) {
return;
}
@@ -91,9 +96,36 @@ public class TestRegionFavoredNodes {
TEST_UTIL.shutdownMiniCluster();
}
- @Test
- public void testFavoredNodes() throws Exception {
- Assume.assumeTrue(createWithFavoredNode != null);
+ protected void checkFavoredNodes(String[] nodeNames) throws Exception {
+ // For each region, check the block locations of each file and ensure that
+ // they are consistent with the favored nodes for that region.
+ for (int i = 0; i < REGION_SERVERS; i++) {
+ HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i);
+ List<HRegion> regions = server.getRegions(TABLE_NAME);
+ for (HRegion region : regions) {
+ List<String> files = region.getStoreFileList(new byte[][] {
COLUMN_FAMILY });
+ for (String file : files) {
+ FileStatus status = TEST_UTIL.getDFSCluster().getFileSystem()
+ .getFileStatus(new Path(new URI(file).getPath()));
+ BlockLocation[] lbks = ((DistributedFileSystem)
TEST_UTIL.getDFSCluster().getFileSystem())
+ .getFileBlockLocations(status, 0, Long.MAX_VALUE);
+ for (BlockLocation lbk : lbks) {
+ locations: for (String info : lbk.getNames()) {
+ for (int j = 0; j < FAVORED_NODES_NUM; j++) {
+ if (info.equals(nodeNames[(i + j) % REGION_SERVERS])) {
+ continue locations;
+ }
+ }
+ // This block was at a location that was not a favored location.
+ fail("Block location " + info + " not a favored node");
+ }
+ }
+ }
+ }
+ }
+ }
+
+ protected InetSocketAddress[] getDataNodes() throws Exception{
// Get the addresses of the datanodes in the cluster.
InetSocketAddress[] nodes = new InetSocketAddress[REGION_SERVERS];
List<DataNode> datanodes = TEST_UTIL.getDFSCluster().getDataNodes();
@@ -106,13 +138,10 @@ public class TestRegionFavoredNodes {
for (int i = 0; i < REGION_SERVERS; i++) {
nodes[i] = (InetSocketAddress)selfAddress.invoke(datanodes.get(i));
}
+ return nodes;
+ }
- String[] nodeNames = new String[REGION_SERVERS];
- for (int i = 0; i < REGION_SERVERS; i++) {
- nodeNames[i] = nodes[i].getAddress().getHostAddress() + ":" +
- nodes[i].getPort();
- }
-
+ protected void updateFavoredNodes(InetSocketAddress[] nodes){
// For each region, choose some datanodes as the favored nodes then assign
// them as favored nodes through the region.
for (int i = 0; i < REGION_SERVERS; i++) {
@@ -120,11 +149,11 @@ public class TestRegionFavoredNodes {
List<HRegion> regions = server.getRegions(TABLE_NAME);
for (HRegion region : regions) {
List<org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName>favoredNodes
=
- new ArrayList<>(3);
+ new ArrayList<>(3);
String encodedRegionName = region.getRegionInfo().getEncodedName();
for (int j = 0; j < FAVORED_NODES_NUM; j++) {
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName.Builder
b =
-
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName.newBuilder();
+
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ServerName.newBuilder();
b.setHostName(nodes[(i + j) %
REGION_SERVERS].getAddress().getHostAddress());
b.setPort(nodes[(i + j) % REGION_SERVERS].getPort());
b.setStartCode(-1);
@@ -133,41 +162,23 @@ public class TestRegionFavoredNodes {
server.updateRegionFavoredNodesMapping(encodedRegionName,
favoredNodes);
}
}
+ }
+ @Test
+ public void testFavoredNodes() throws Exception {
+ Assume.assumeTrue(createWithFavoredNode != null);
+ InetSocketAddress[] nodes = getDataNodes();
+ String[] nodeNames = new String[REGION_SERVERS];
+ for (int i = 0; i < REGION_SERVERS; i++) {
+ nodeNames[i] = nodes[i].getAddress().getHostAddress() + ":" +
nodes[i].getPort();
+ }
+ updateFavoredNodes(nodes);
// Write some data to each region and flush. Repeat some number of times to
// get multiple files for each region.
for (int i = 0; i < FLUSHES; i++) {
TEST_UTIL.loadTable(table, COLUMN_FAMILY, false);
TEST_UTIL.flush();
}
-
- // For each region, check the block locations of each file and ensure that
- // they are consistent with the favored nodes for that region.
- for (int i = 0; i < REGION_SERVERS; i++) {
- HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i);
- List<HRegion> regions = server.getRegions(TABLE_NAME);
- for (HRegion region : regions) {
- List<String> files = region.getStoreFileList(new
byte[][]{COLUMN_FAMILY});
- for (String file : files) {
- FileStatus status = TEST_UTIL.getDFSCluster().getFileSystem().
- getFileStatus(new Path(new URI(file).getPath()));
- BlockLocation[] lbks =
-
((DistributedFileSystem)TEST_UTIL.getDFSCluster().getFileSystem())
- .getFileBlockLocations(status, 0, Long.MAX_VALUE);
- for (BlockLocation lbk : lbks) {
- locations:
- for (String info : lbk.getNames()) {
- for (int j = 0; j < FAVORED_NODES_NUM; j++) {
- if (info.equals(nodeNames[(i + j) % REGION_SERVERS])) {
- continue locations;
- }
- }
- // This block was at a location that was not a favored
location.
- fail("Block location " + info + " not a favored node");
- }
- }
- }
- }
- }
+ checkFavoredNodes(nodeNames);
}
}