This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 8e3727e HBASE-25281 Bulkload split hfile too many times due to
unreasonable split point (#2692)
8e3727e is described below
commit 8e3727ea06e85f37247f060598a54d37b303f21a
Author: niuyulin <[email protected]>
AuthorDate: Wed Nov 25 15:27:09 2020 +0800
HBASE-25281 Bulkload split hfile too many times due to unreasonable split
point (#2692)
Signed-off-by: Guanghao Zhang <[email protected]>
---
.../hadoop/hbase/tool/LoadIncrementalHFiles.java | 92 ++++++++++++++--------
.../TestLoadIncrementalHFilesSplitRecovery.java | 28 +++++++
2 files changed, 85 insertions(+), 35 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index ddc857c..866faf2 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -723,6 +723,45 @@ public class LoadIncrementalHFiles extends Configured
implements Tool {
}
/**
+ * @param startEndKeys the start/end keys of regions belong to this table,
the list in ascending
+ * order by start key
+ * @param key the key need to find which region belong to
+ * @return region index
+ */
+ private int getRegionIndex(final Pair<byte[][], byte[][]> startEndKeys,
byte[] key) {
+ int idx = Arrays.binarySearch(startEndKeys.getFirst(), key,
Bytes.BYTES_COMPARATOR);
+ if (idx < 0) {
+ // not on boundary, returns -(insertion index). Calculate region it
+ // would be in.
+ idx = -(idx + 1) - 1;
+ }
+ return idx;
+ }
+
+ /**
+ * we can consider there is a region hole in following conditions. 1) if idx
< 0,then first
+ * region info is lost. 2) if the endkey of a region is not equal to the
startkey of the next
+ * region. 3) if the endkey of the last region is not empty.
+ */
+ private void checkRegionIndexValid(int idx, final Pair<byte[][], byte[][]>
startEndKeys,
+ TableName tableName) throws IOException {
+ if (idx < 0) {
+ throw new IOException("The first region info for table " + tableName +
+ " can't be found in hbase:meta.Please use hbck tool to fix it first.");
+ } else if ((idx == startEndKeys.getFirst().length - 1) &&
+ !Bytes.equals(startEndKeys.getSecond()[idx],
HConstants.EMPTY_BYTE_ARRAY)) {
+ throw new IOException("The last region info for table " + tableName +
+ " can't be found in hbase:meta.Please use hbck tool to fix it first.");
+ } else if (idx + 1 < startEndKeys.getFirst().length &&
+ !(Bytes.compareTo(startEndKeys.getSecond()[idx],
+ startEndKeys.getFirst()[idx + 1]) == 0)) {
+ throw new IOException("The endkey of one region for table " + tableName +
+ " is not equal to the startkey of the next region in hbase:meta." +
+ "Please use hbck tool to fix it first.");
+ }
+ }
+
+ /**
* Attempt to assign the given load queue item into its target region group.
If the hfile boundary
* no longer fits into a region, physically splits the hfile such that the
new bottom half will
* fit and returns the list of LQI's corresponding to the resultant hfiles.
@@ -745,8 +784,8 @@ public class LoadIncrementalHFiles extends Configured
implements Tool {
return new Pair<>(null, hfilePath.getName());
}
- LOG.info("Trying to load hfile=" + hfilePath + " first=" +
first.map(Bytes::toStringBinary) +
- " last=" + last.map(Bytes::toStringBinary));
+ LOG.info("Trying to load hfile=" + hfilePath + " first=" +
first.map(Bytes::toStringBinary)
+ + " last=" + last.map(Bytes::toStringBinary));
if (!first.isPresent() || !last.isPresent()) {
assert !first.isPresent() && !last.isPresent();
// TODO what if this is due to a bad HFile?
@@ -754,47 +793,30 @@ public class LoadIncrementalHFiles extends Configured
implements Tool {
return null;
}
if (Bytes.compareTo(first.get(), last.get()) > 0) {
- throw new IllegalArgumentException("Invalid range: " +
Bytes.toStringBinary(first.get()) +
- " > " + Bytes.toStringBinary(last.get()));
- }
- int idx = Arrays.binarySearch(startEndKeys.getFirst(), first.get(),
Bytes.BYTES_COMPARATOR);
- if (idx < 0) {
- // not on boundary, returns -(insertion index). Calculate region it
- // would be in.
- idx = -(idx + 1) - 1;
- }
- int indexForCallable = idx;
-
- /**
- * we can consider there is a region hole in following conditions. 1) if
idx < 0,then first
- * region info is lost. 2) if the endkey of a region is not equal to the
startkey of the next
- * region. 3) if the endkey of the last region is not empty.
- */
- if (indexForCallable < 0) {
- throw new IOException("The first region info for table " +
table.getName() +
- " can't be found in hbase:meta.Please use hbck tool to fix it
first.");
- } else if ((indexForCallable == startEndKeys.getFirst().length - 1) &&
- !Bytes.equals(startEndKeys.getSecond()[indexForCallable],
HConstants.EMPTY_BYTE_ARRAY)) {
- throw new IOException("The last region info for table " +
table.getName() +
- " can't be found in hbase:meta.Please use hbck tool to fix it
first.");
- } else if (indexForCallable + 1 < startEndKeys.getFirst().length &&
- !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
- startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
- throw new IOException("The endkey of one region for table " +
table.getName() +
- " is not equal to the startkey of the next region in hbase:meta." +
- "Please use hbck tool to fix it first.");
+ throw new IllegalArgumentException("Invalid range: " +
Bytes.toStringBinary(first.get())
+ + " > " + Bytes.toStringBinary(last.get()));
}
- boolean lastKeyInRange = Bytes.compareTo(last.get(),
startEndKeys.getSecond()[idx]) < 0 ||
- Bytes.equals(startEndKeys.getSecond()[idx],
HConstants.EMPTY_BYTE_ARRAY);
+ int firstKeyRegionIdx = getRegionIndex(startEndKeys, first.get());
+ checkRegionIndexValid(firstKeyRegionIdx, startEndKeys, table.getName());
+ boolean lastKeyInRange =
+ Bytes.compareTo(last.get(),
startEndKeys.getSecond()[firstKeyRegionIdx]) < 0 || Bytes
+ .equals(startEndKeys.getSecond()[firstKeyRegionIdx],
HConstants.EMPTY_BYTE_ARRAY);
if (!lastKeyInRange) {
+ int lastKeyRegionIdx = getRegionIndex(startEndKeys, last.get());
+ int splitIdx = (firstKeyRegionIdx + lastKeyRegionIdx) >>> 1;
+ // make sure the splitPoint is valid in case region overlap occur, maybe
the splitPoint bigger
+ // than hfile.endkey w/o this check
+ if (splitIdx != firstKeyRegionIdx) {
+ checkRegionIndexValid(splitIdx, startEndKeys, table.getName());
+ }
List<LoadQueueItem> lqis = splitStoreFile(item, table,
- startEndKeys.getFirst()[indexForCallable],
startEndKeys.getSecond()[indexForCallable]);
+ startEndKeys.getFirst()[firstKeyRegionIdx],
startEndKeys.getSecond()[splitIdx]);
return new Pair<>(lqis, null);
}
// group regions.
- regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
+
regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[firstKeyRegionIdx]),
item);
return null;
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
index 05c2457..c65e3f2 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
@@ -470,6 +470,34 @@ public class TestLoadIncrementalHFilesSplitRecovery {
}
}
+ @Test
+ public void testCorrectSplitPoint() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
+ Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"),
Bytes.toBytes("row_00000040"),
+ Bytes.toBytes("row_00000050"), Bytes.toBytes("row_00000060"),
+ Bytes.toBytes("row_00000070") };
+ setupTableWithSplitkeys(table, NUM_CFS, SPLIT_KEYS);
+
+ final AtomicInteger bulkloadRpcTimes = new AtomicInteger();
+ BulkLoadHFilesTool loader = new
BulkLoadHFilesTool(util.getConfiguration()) {
+
+ @Override
+ protected void bulkLoadPhase(Table table, Connection conn,
ExecutorService pool,
+ Deque<LoadIncrementalHFiles.LoadQueueItem> queue,
+ Multimap<ByteBuffer, LoadIncrementalHFiles.LoadQueueItem>
regionGroups, boolean copyFile,
+ Map<LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> item2RegionMap)
throws IOException {
+ bulkloadRpcTimes.addAndGet(1);
+ super.bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile,
item2RegionMap);
+ }
+ };
+
+ Path dir = buildBulkFiles(table, 1);
+ loader.bulkLoad(table, dir);
+ // before HBASE-25281 we need invoke bulkload rpc 8 times
+ assertEquals(4, bulkloadRpcTimes.get());
+ }
+
/**
* This test creates a table with many small regions. The bulk load files
would be splitted
* multiple times before all of them can be loaded successfully.