This is an automated email from the ASF dual-hosted git repository.

meszibalu pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new a7dab37b7f2 HBASE-22939 SpaceQuotas - Bulkload from different hdfs 
failed when space quotas are turned on. (#4748)
a7dab37b7f2 is described below

commit a7dab37b7f28a83fdd63e04a51d37e86e650970e
Author: Istvan Toth <[email protected]>
AuthorDate: Wed Aug 31 15:47:52 2022 +0200

    HBASE-22939 SpaceQuotas - Bulkload from different hdfs failed when space 
quotas are turned on. (#4748)
    
    Signed-off-by: Sakthi <[email protected]>
---
 ...estReplicationSyncUpToolWithBulkLoadedData.java | 53 ++++++++++++++++++----
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 12 ++++-
 2 files changed, 55 insertions(+), 10 deletions(-)

diff --git 
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
 
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
index 9184717d579..8c26547ee7c 100644
--- 
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
+++ 
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.quotas.QuotaUtil;
 import 
org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -64,6 +65,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData 
extends TestReplication
   protected void customizeClusterConf(Configuration conf) {
     conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
     conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
+    conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
     conf.set("hbase.replication.source.fs.conf.provider",
       TestSourceFSConfigurationProvider.class.getCanonicalName());
     String classes = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
@@ -82,10 +84,10 @@ public class TestReplicationSyncUpToolWithBulkLoadedData 
extends TestReplication
     setupReplication();
 
     /**
-     * Prepare 16 random hfile ranges required for creating hfiles
+     * Prepare 24 random hfile ranges required for creating hfiles
      */
-    Set<String> randomHFileRanges = new HashSet<>(16);
-    for (int i = 0; i < 16; i++) {
+    Set<String> randomHFileRanges = new HashSet<>(24);
+    for (int i = 0; i < 24; i++) {
       randomHFileRanges.add(HBaseTestingUtility.getRandomUUID().toString());
     }
     List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
@@ -93,8 +95,9 @@ public class TestReplicationSyncUpToolWithBulkLoadedData 
extends TestReplication
     Iterator<String> randomHFileRangeListIterator = 
randomHFileRangeList.iterator();
 
     /**
-     * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep 
t2_syncup: Load 200 rows
-     * into cf1, and 3 rows into norep verify correctly replicated to slave
+     * at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other 
hdfs into cf1, and 3 rows
+     * into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other 
hdfs into cf1, and 3
+     * rows into norep verify correctly replicated to slave
      */
     loadAndReplicateHFiles(true, randomHFileRangeListIterator);
 
@@ -175,11 +178,17 @@ public class TestReplicationSyncUpToolWithBulkLoadedData 
extends TestReplication
     Iterator<String> randomHFileRangeListIterator) throws Exception {
     LOG.debug("loadAndReplicateHFiles");
 
-    // Load 100 + 3 hfiles to t1_syncup.
+    // Load 50 + 50 + 3 hfiles to t1_syncup.
     byte[][][] hfileRanges =
       new byte[][][] { new byte[][] { 
Bytes.toBytes(randomHFileRangeListIterator.next()),
         Bytes.toBytes(randomHFileRangeListIterator.next()) } };
-    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, 
ht1Source, hfileRanges, 100);
+    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, 
ht1Source, hfileRanges, 50);
+
+    hfileRanges =
+      new byte[][][] { new byte[][] { 
Bytes.toBytes(randomHFileRangeListIterator.next()),
+        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+    loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, 
FAMILY, ht1Source,
+      hfileRanges, 50);
 
     hfileRanges =
       new byte[][][] { new byte[][] { 
Bytes.toBytes(randomHFileRangeListIterator.next()),
@@ -187,11 +196,17 @@ public class TestReplicationSyncUpToolWithBulkLoadedData 
extends TestReplication
     loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, 
ht1Source,
       hfileRanges, 3);
 
-    // Load 200 + 3 hfiles to t2_syncup.
+    // Load 100 + 100 + 3 hfiles to t2_syncup.
+    hfileRanges =
+      new byte[][][] { new byte[][] { 
Bytes.toBytes(randomHFileRangeListIterator.next()),
+        Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, 
ht2Source, hfileRanges, 100);
+
     hfileRanges =
       new byte[][][] { new byte[][] { 
Bytes.toBytes(randomHFileRangeListIterator.next()),
         Bytes.toBytes(randomHFileRangeListIterator.next()) } };
-    loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, 
ht2Source, hfileRanges, 200);
+    loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, 
FAMILY, ht2Source,
+      hfileRanges, 100);
 
     hfileRanges =
       new byte[][][] { new byte[][] { 
Bytes.toBytes(randomHFileRangeListIterator.next()),
@@ -229,6 +244,26 @@ public class TestReplicationSyncUpToolWithBulkLoadedData 
extends TestReplication
     loader.bulkLoad(tableName, dir);
   }
 
+  private void loadFromOtherHDFSAndValidateHFileReplication(String testName, 
byte[] row, byte[] fam,
+    Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
+    Path dir = UTIL2.getDataTestDirOnTestFS(testName);
+    FileSystem fs = UTIL2.getTestFileSystem();
+    dir = dir.makeQualified(fs);
+    Path familyDir = new Path(dir, Bytes.toString(fam));
+
+    int hfileIdx = 0;
+    for (byte[][] range : hfileRanges) {
+      byte[] from = range[0];
+      byte[] to = range[1];
+      HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs,
+        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, 
numOfRows);
+    }
+
+    final TableName tableName = source.getName();
+    BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
+    loader.bulkLoad(tableName, dir);
+  }
+
   private void wait(Table target, int expectedCount, String msg)
     throws IOException, InterruptedException {
     for (int i = 0; i < NB_RETRIES; i++) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index b3cc6ab1092..6c46be96ca9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ByteBufferExtendedCell;
 import org.apache.hadoop.hbase.CacheEvictionStats;
@@ -2416,7 +2417,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler, AdminService.Blockin
             filePaths.add(familyPath.getPath());
           }
           // Check if the batch of files exceeds the current quota
-          sizeToBeLoaded = 
enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
+          sizeToBeLoaded = 
enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
         }
       }
 
@@ -2521,6 +2522,15 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler, AdminService.Blockin
     }
   }
 
+  private FileSystem getFileSystem(List<String> filePaths) throws IOException {
+    if (filePaths.isEmpty()) {
+      // local hdfs
+      return regionServer.getFileSystem();
+    }
+    // source hdfs
+    return new 
Path(filePaths.get(0)).getFileSystem(regionServer.getConfiguration());
+  }
+
   private com.google.protobuf.Message execServiceOnRegion(HRegion region,
     final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
     // ignore the passed in controller (from the serialized call)

Reply via email to