This is an automated email from the ASF dual-hosted git repository.
meszibalu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new a7dab37b7f2 HBASE-22939 SpaceQuotas - Bulkload from different hdfs
failed when space quotas are turned on. (#4748)
a7dab37b7f2 is described below
commit a7dab37b7f28a83fdd63e04a51d37e86e650970e
Author: Istvan Toth <[email protected]>
AuthorDate: Wed Aug 31 15:47:52 2022 +0200
HBASE-22939 SpaceQuotas - Bulkload from different hdfs failed when space
quotas are turned on. (#4748)
Signed-off-by: Sakthi <[email protected]>
---
...estReplicationSyncUpToolWithBulkLoadedData.java | 53 ++++++++++++++++++----
.../hadoop/hbase/regionserver/RSRpcServices.java | 12 ++++-
2 files changed, 55 insertions(+), 10 deletions(-)
diff --git
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
index 9184717d579..8c26547ee7c 100644
---
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
+++
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.quotas.QuotaUtil;
import
org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -64,6 +65,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData
extends TestReplication
protected void customizeClusterConf(Configuration conf) {
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
+ conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
conf.set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());
String classes = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
@@ -82,10 +84,10 @@ public class TestReplicationSyncUpToolWithBulkLoadedData
extends TestReplication
setupReplication();
/**
- * Prepare 16 random hfile ranges required for creating hfiles
+ * Prepare 24 random hfile ranges required for creating hfiles
*/
- Set<String> randomHFileRanges = new HashSet<>(16);
- for (int i = 0; i < 16; i++) {
+ Set<String> randomHFileRanges = new HashSet<>(24);
+ for (int i = 0; i < 24; i++) {
randomHFileRanges.add(HBaseTestingUtility.getRandomUUID().toString());
}
List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
@@ -93,8 +95,9 @@ public class TestReplicationSyncUpToolWithBulkLoadedData
extends TestReplication
Iterator<String> randomHFileRangeListIterator =
randomHFileRangeList.iterator();
/**
- * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep
t2_syncup: Load 200 rows
- * into cf1, and 3 rows into norep verify correctly replicated to slave
+ * at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other
hdfs into cf1, and 3 rows
+ * into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other
hdfs into cf1, and 3
+ * rows into norep verify correctly replicated to slave
*/
loadAndReplicateHFiles(true, randomHFileRangeListIterator);
@@ -175,11 +178,17 @@ public class TestReplicationSyncUpToolWithBulkLoadedData
extends TestReplication
Iterator<String> randomHFileRangeListIterator) throws Exception {
LOG.debug("loadAndReplicateHFiles");
- // Load 100 + 3 hfiles to t1_syncup.
+ // Load 50 + 50 + 3 hfiles to t1_syncup.
byte[][][] hfileRanges =
new byte[][][] { new byte[][] {
Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
- loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY,
ht1Source, hfileRanges, 100);
+ loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY,
ht1Source, hfileRanges, 50);
+
+ hfileRanges =
+ new byte[][][] { new byte[][] {
Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row,
FAMILY, ht1Source,
+ hfileRanges, 50);
hfileRanges =
new byte[][][] { new byte[][] {
Bytes.toBytes(randomHFileRangeListIterator.next()),
@@ -187,11 +196,17 @@ public class TestReplicationSyncUpToolWithBulkLoadedData
extends TestReplication
loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY,
ht1Source,
hfileRanges, 3);
- // Load 200 + 3 hfiles to t2_syncup.
+ // Load 100 + 100 + 3 hfiles to t2_syncup.
+ hfileRanges =
+ new byte[][][] { new byte[][] {
Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY,
ht2Source, hfileRanges, 100);
+
hfileRanges =
new byte[][][] { new byte[][] {
Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
- loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY,
ht2Source, hfileRanges, 200);
+ loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row,
FAMILY, ht2Source,
+ hfileRanges, 100);
hfileRanges =
new byte[][][] { new byte[][] {
Bytes.toBytes(randomHFileRangeListIterator.next()),
@@ -229,6 +244,26 @@ public class TestReplicationSyncUpToolWithBulkLoadedData
extends TestReplication
loader.bulkLoad(tableName, dir);
}
+ private void loadFromOtherHDFSAndValidateHFileReplication(String testName,
byte[] row, byte[] fam,
+ Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
+ Path dir = UTIL2.getDataTestDirOnTestFS(testName);
+ FileSystem fs = UTIL2.getTestFileSystem();
+ dir = dir.makeQualified(fs);
+ Path familyDir = new Path(dir, Bytes.toString(fam));
+
+ int hfileIdx = 0;
+ for (byte[][] range : hfileRanges) {
+ byte[] from = range[0];
+ byte[] to = range[1];
+ HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs,
+ new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to,
numOfRows);
+ }
+
+ final TableName tableName = source.getName();
+ BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
+ loader.bulkLoad(tableName, dir);
+ }
+
private void wait(Table target, int expectedCount, String msg)
throws IOException, InterruptedException {
for (int i = 0; i < NB_RETRIES; i++) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index b3cc6ab1092..6c46be96ca9 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.CacheEvictionStats;
@@ -2416,7 +2417,7 @@ public class RSRpcServices implements
HBaseRPCErrorHandler, AdminService.Blockin
filePaths.add(familyPath.getPath());
}
// Check if the batch of files exceeds the current quota
- sizeToBeLoaded =
enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
+ sizeToBeLoaded =
enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
}
}
@@ -2521,6 +2522,15 @@ public class RSRpcServices implements
HBaseRPCErrorHandler, AdminService.Blockin
}
}
+ private FileSystem getFileSystem(List<String> filePaths) throws IOException {
+ if (filePaths.isEmpty()) {
+ // local hdfs
+ return regionServer.getFileSystem();
+ }
+ // source hdfs
+ return new
Path(filePaths.get(0)).getFileSystem(regionServer.getConfiguration());
+ }
+
private com.google.protobuf.Message execServiceOnRegion(HRegion region,
final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
// ignore the passed in controller (from the serialized call)