This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 4dfcd1bb0ac HBASE-22335 do add hfile ref only when replication_scope 
is 1 (#6955)
4dfcd1bb0ac is described below

commit 4dfcd1bb0ac926b55f44f1f1712f2c33981f626f
Author: Chandra Sekhar K <chandrasekhar1...@gmail.com>
AuthorDate: Tue Jul 1 16:14:49 2025 +0530

    HBASE-22335 do add hfile ref only when replication_scope is 1 (#6955)
    
    Co-authored-by: chenxu14 <chenxu14@172.18.167.134>
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    (cherry picked from commit e3aacaf62e2094b402a6624b2627b4b1a2323836)
---
 .../regionserver/ReplicationObserver.java          |  16 ++-
 .../regionserver/TestBulkLoadReplication.java      | 135 +++++++++++++++++----
 2 files changed, 124 insertions(+), 27 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
index b762fb94e8a..a1e2f0d6dec 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
@@ -23,6 +23,7 @@ import java.util.Optional;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -31,6 +32,7 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -65,11 +67,23 @@ public class ReplicationObserver implements 
RegionCoprocessor, RegionObserver {
         + "data replication.");
       return;
     }
+    TableName tableName = env.getRegionInfo().getTable();
+    if (
+      env.getRegion().getTableDescriptor().getColumnFamily(family).getScope()
+          != HConstants.REPLICATION_SCOPE_GLOBAL
+    ) {
+      LOG
+        .debug("Skipping recording bulk load entries in preCommitStoreFile for 
table:{}, family:{},"
+          + " Because the replication is not enabled", tableName, 
Bytes.toString(family));
+      return;
+    }
+
     // This is completely cheating AND getting a HRegionServer from a 
RegionServerEnvironment is
     // just going to break. This is all private. Not allowed. Regions 
shouldn't assume they are
     // hosted in a RegionServer. TODO: fix.
     RegionServerServices rss = ((HasRegionServerServices) 
env).getRegionServerServices();
     Replication rep = (Replication) ((HRegionServer) 
rss).getReplicationSourceService();
-    rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs);
+
+    rep.addHFileRefsToQueue(tableName, family, pairs);
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
index 127ebcaabfe..55df08e1ef1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -20,12 +20,15 @@ package org.apache.hadoop.hbase.regionserver;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -60,6 +63,8 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.replication.TestReplicationBase;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -67,7 +72,6 @@ import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -121,6 +125,10 @@ public class TestBulkLoadReplication extends 
TestReplicationBase {
   @ClassRule
   public static TemporaryFolder testFolder = new TemporaryFolder();
 
+  private static ReplicationQueueStorage queueStorage;
+
+  private static boolean replicationPeersAdded = false;
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
@@ -129,6 +137,8 @@ public class TestBulkLoadReplication extends 
TestReplicationBase {
     setupConfig(UTIL3, "/3");
     TestReplicationBase.setUpBeforeClass();
     startThirdCluster();
+    queueStorage = 
ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getConnection(),
+      UTIL1.getConfiguration());
   }
 
   private static void startThirdCluster() throws Exception {
@@ -152,22 +162,27 @@ public class TestBulkLoadReplication extends 
TestReplicationBase {
   @Before
   @Override
   public void setUpBase() throws Exception {
-    // "super.setUpBase()" already sets replication from 1->2,
-    // then on the subsequent lines, sets 2->1, 2->3 and 3->2.
-    // So we have following topology: "1 <-> 2 <->3"
-    super.setUpBase();
-    ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
-    ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
-    ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
-    // adds cluster1 as a remote peer on cluster2
-    UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
-    // adds cluster3 as a remote peer on cluster2
-    UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
-    // adds cluster2 as a remote peer on cluster3
-    UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
-    setupCoprocessor(UTIL1);
-    setupCoprocessor(UTIL2);
-    setupCoprocessor(UTIL3);
+    // removing the peer and adding again causing the previously completed 
bulk load jobs getting
+    // submitted again, adding a check to add the peers only once.
+    if (!replicationPeersAdded) {
+      // "super.setUpBase()" already sets replication from 1->2,
+      // then on the subsequent lines, sets 2->1, 2->3 and 3->2.
+      // So we have following topology: "1 <-> 2 <->3"
+      super.setUpBase();
+      ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
+      ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
+      ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
+      // adds cluster1 as a remote peer on cluster2
+      UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
+      // adds cluster3 as a remote peer on cluster2
+      UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
+      // adds cluster2 as a remote peer on cluster3
+      UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
+      setupCoprocessor(UTIL1);
+      setupCoprocessor(UTIL2);
+      setupCoprocessor(UTIL3);
+      replicationPeersAdded = true;
+    }
     BULK_LOADS_COUNT = new AtomicInteger(0);
   }
 
@@ -195,15 +210,6 @@ public class TestBulkLoadReplication extends 
TestReplicationBase {
     });
   }
 
-  @After
-  @Override
-  public void tearDownBase() throws Exception {
-    super.tearDownBase();
-    UTIL2.getAdmin().removeReplicationPeer(PEER_ID1);
-    UTIL2.getAdmin().removeReplicationPeer(PEER_ID3);
-    UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
-  }
-
   protected static void setupBulkLoadConfigsForCluster(Configuration config,
     String clusterReplicationId) throws Exception {
     config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
@@ -322,4 +328,81 @@ public class TestBulkLoadReplication extends 
TestReplicationBase {
       });
     }
   }
+
+  @Test
+  public void testBulkloadReplicationActiveActiveForNoRepFamily() throws 
Exception {
+    Table peer1TestTable = 
UTIL1.getConnection().getTable(TestReplicationBase.tableName);
+    Table peer2TestTable = 
UTIL2.getConnection().getTable(TestReplicationBase.tableName);
+    Table peer3TestTable = 
UTIL3.getConnection().getTable(TestReplicationBase.tableName);
+    byte[] row = Bytes.toBytes("004");
+    byte[] value = Bytes.toBytes("v4");
+    assertBulkLoadConditionsForNoRepFamily(row, value, UTIL1, peer1TestTable, 
peer2TestTable,
+      peer3TestTable);
+    // additional wait to make sure no extra bulk load happens
+    Thread.sleep(400);
+    assertEquals(1, BULK_LOADS_COUNT.get());
+    assertEquals(0, queueStorage.getAllHFileRefs().size());
+  }
+
+  private void assertBulkLoadConditionsForNoRepFamily(byte[] row, byte[] value,
+    HBaseTestingUtil utility, Table... tables) throws Exception {
+    BULK_LOAD_LATCH = new CountDownLatch(1);
+    bulkLoadOnClusterForNoRepFamily(row, value, utility);
+    assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
+    assertTableHasValue(tables[0], row, value);
+    assertTableNotHasValue(tables[1], row, value);
+    assertTableNotHasValue(tables[2], row, value);
+  }
+
+  private void bulkLoadOnClusterForNoRepFamily(byte[] row, byte[] value, 
HBaseTestingUtil cluster)
+    throws Exception {
+    String bulkloadFile = createHFileForNoRepFamilies(row, value, 
cluster.getConfiguration());
+    Path bulkLoadFilePath = new Path(bulkloadFile);
+    copyToHdfsForNoRepFamily(bulkloadFile, cluster.getDFSCluster());
+    BulkLoadHFilesTool bulkLoadHFilesTool = new 
BulkLoadHFilesTool(cluster.getConfiguration());
+    Map<byte[], List<Path>> family2Files = new HashMap<>();
+    List<Path> files = new ArrayList<>();
+    files.add(new Path(
+      BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/" + 
bulkLoadFilePath.getName()));
+    family2Files.put(noRepfamName, files);
+    bulkLoadHFilesTool.bulkLoad(tableName, family2Files);
+  }
+
+  private String createHFileForNoRepFamilies(byte[] row, byte[] value, 
Configuration clusterConfig)
+    throws IOException {
+    ExtendedCellBuilder cellBuilder = 
ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
+    cellBuilder.setRow(row).setFamily(TestReplicationBase.noRepfamName)
+      .setQualifier(Bytes.toBytes("1")).setValue(value).setType(Cell.Type.Put);
+
+    HFile.WriterFactory hFileFactory = 
HFile.getWriterFactoryNoCache(clusterConfig);
+    // TODO We need a way to do this without creating files
+    File hFileLocation = testFolder.newFile();
+    FSDataOutputStream out = new FSDataOutputStream(new 
FileOutputStream(hFileLocation), null);
+    try {
+      hFileFactory.withOutputStream(out);
+      hFileFactory.withFileContext(new HFileContextBuilder().build());
+      HFile.Writer writer = hFileFactory.create();
+      try {
+        writer.append(new KeyValue(cellBuilder.build()));
+      } finally {
+        writer.close();
+      }
+    } finally {
+      out.close();
+    }
+    return hFileLocation.getAbsoluteFile().getAbsolutePath();
+  }
+
+  private void copyToHdfsForNoRepFamily(String bulkLoadFilePath, 
MiniDFSCluster cluster)
+    throws Exception {
+    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/" + 
Bytes.toString(noRepfamName) + "/");
+    cluster.getFileSystem().mkdirs(bulkLoadDir);
+    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), 
bulkLoadDir);
+  }
+
+  private void assertTableNotHasValue(Table table, byte[] row, byte[] value) 
throws IOException {
+    Get get = new Get(row);
+    Result result = table.get(get);
+    assertNotEquals(Bytes.toString(value), Bytes.toString(result.value()));
+  }
 }

Reply via email to