This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push: new 4dfcd1bb0ac HBASE-22335 do add hfile ref only when replication_scope is 1 (#6955) 4dfcd1bb0ac is described below commit 4dfcd1bb0ac926b55f44f1f1712f2c33981f626f Author: Chandra Sekhar K <chandrasekhar1...@gmail.com> AuthorDate: Tue Jul 1 16:14:49 2025 +0530 HBASE-22335 do add hfile ref only when replication_scope is 1 (#6955) Co-authored-by: chenxu14 <chenxu14@172.18.167.134> Signed-off-by: Duo Zhang <zhang...@apache.org> (cherry picked from commit e3aacaf62e2094b402a6624b2627b4b1a2323836) --- .../regionserver/ReplicationObserver.java | 16 ++- .../regionserver/TestBulkLoadReplication.java | 135 +++++++++++++++++---- 2 files changed, 124 insertions(+), 27 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java index b762fb94e8a..a1e2f0d6dec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java @@ -23,6 +23,7 @@ import java.util.Optional; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -31,6 +32,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -65,11 +67,23 @@ public class ReplicationObserver implements RegionCoprocessor, RegionObserver { + "data replication."); return; } + TableName tableName = env.getRegionInfo().getTable(); + if ( + env.getRegion().getTableDescriptor().getColumnFamily(family).getScope() + != HConstants.REPLICATION_SCOPE_GLOBAL + ) { + LOG + .debug("Skipping recording bulk load entries in preCommitStoreFile for table:{}, family:{}," + + " Because the replication is not enabled", tableName, Bytes.toString(family)); + return; + } + // This is completely cheating AND getting a HRegionServer from a RegionServerEnvironment is // just going to break. This is all private. Not allowed. Regions shouldn't assume they are // hosted in a RegionServer. TODO: fix. RegionServerServices rss = ((HasRegionServerServices) env).getRegionServerServices(); Replication rep = (Replication) ((HRegionServer) rss).getReplicationSourceService(); - rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs); + + rep.addHFileRefsToQueue(tableName, family, pairs); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java index 127ebcaabfe..55df08e1ef1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java @@ -20,12 +20,15 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID; import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -60,6 +63,8 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -67,7 +72,6 @@ import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -121,6 +125,10 @@ public class TestBulkLoadReplication extends TestReplicationBase { @ClassRule public static TemporaryFolder testFolder = new TemporaryFolder(); + private static ReplicationQueueStorage queueStorage; + + private static boolean replicationPeersAdded = false; + @BeforeClass public static void setUpBeforeClass() throws Exception { setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID); @@ -129,6 +137,8 @@ public class TestBulkLoadReplication extends TestReplicationBase { setupConfig(UTIL3, "/3"); TestReplicationBase.setUpBeforeClass(); startThirdCluster(); + queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getConnection(), + UTIL1.getConfiguration()); } private static void startThirdCluster() throws Exception { @@ -152,22 +162,27 @@ public class TestBulkLoadReplication extends TestReplicationBase { @Before @Override public void setUpBase() throws Exception { - // "super.setUpBase()" already sets replication from 1->2, - // then on the subsequent lines, sets 2->1, 2->3 and 3->2. - // So we have following topology: "1 <-> 2 <->3" - super.setUpBase(); - ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1); - ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2); - ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3); - // adds cluster1 as a remote peer on cluster2 - UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config); - // adds cluster3 as a remote peer on cluster2 - UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); - // adds cluster2 as a remote peer on cluster3 - UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config); - setupCoprocessor(UTIL1); - setupCoprocessor(UTIL2); - setupCoprocessor(UTIL3); + // removing the peer and adding again causing the previously completed bulk load jobs getting + // submitted again, adding a check to add the peers only once. + if (!replicationPeersAdded) { + // "super.setUpBase()" already sets replication from 1->2, + // then on the subsequent lines, sets 2->1, 2->3 and 3->2. + // So we have following topology: "1 <-> 2 <->3" + super.setUpBase(); + ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1); + ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2); + ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3); + // adds cluster1 as a remote peer on cluster2 + UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config); + // adds cluster3 as a remote peer on cluster2 + UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config); + // adds cluster2 as a remote peer on cluster3 + UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config); + setupCoprocessor(UTIL1); + setupCoprocessor(UTIL2); + setupCoprocessor(UTIL3); + replicationPeersAdded = true; + } BULK_LOADS_COUNT = new AtomicInteger(0); } @@ -195,15 +210,6 @@ public class TestBulkLoadReplication extends TestReplicationBase { }); } - @After - @Override - public void tearDownBase() throws Exception { - super.tearDownBase(); - UTIL2.getAdmin().removeReplicationPeer(PEER_ID1); - UTIL2.getAdmin().removeReplicationPeer(PEER_ID3); - UTIL3.getAdmin().removeReplicationPeer(PEER_ID2); - } - protected static void setupBulkLoadConfigsForCluster(Configuration config, String clusterReplicationId) throws Exception { config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); @@ -322,4 +328,81 @@ public class TestBulkLoadReplication extends TestReplicationBase { }); } } + + @Test + public void testBulkloadReplicationActiveActiveForNoRepFamily() throws Exception { + Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName); + Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName); + Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName); + byte[] row = Bytes.toBytes("004"); + byte[] value = Bytes.toBytes("v4"); + assertBulkLoadConditionsForNoRepFamily(row, value, UTIL1, peer1TestTable, peer2TestTable, + peer3TestTable); + // additional wait to make sure no extra bulk load happens + Thread.sleep(400); + assertEquals(1, BULK_LOADS_COUNT.get()); + assertEquals(0, queueStorage.getAllHFileRefs().size()); + } + + private void assertBulkLoadConditionsForNoRepFamily(byte[] row, byte[] value, + HBaseTestingUtil utility, Table... tables) throws Exception { + BULK_LOAD_LATCH = new CountDownLatch(1); + bulkLoadOnClusterForNoRepFamily(row, value, utility); + assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES)); + assertTableHasValue(tables[0], row, value); + assertTableNotHasValue(tables[1], row, value); + assertTableNotHasValue(tables[2], row, value); + } + + private void bulkLoadOnClusterForNoRepFamily(byte[] row, byte[] value, HBaseTestingUtil cluster) + throws Exception { + String bulkloadFile = createHFileForNoRepFamilies(row, value, cluster.getConfiguration()); + Path bulkLoadFilePath = new Path(bulkloadFile); + copyToHdfsForNoRepFamily(bulkloadFile, cluster.getDFSCluster()); + BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration()); + Map<byte[], List<Path>> family2Files = new HashMap<>(); + List<Path> files = new ArrayList<>(); + files.add(new Path( + BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/" + bulkLoadFilePath.getName())); + family2Files.put(noRepfamName, files); + bulkLoadHFilesTool.bulkLoad(tableName, family2Files); + } + + private String createHFileForNoRepFamilies(byte[] row, byte[] value, Configuration clusterConfig) + throws IOException { + ExtendedCellBuilder cellBuilder = ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY); + cellBuilder.setRow(row).setFamily(TestReplicationBase.noRepfamName) + .setQualifier(Bytes.toBytes("1")).setValue(value).setType(Cell.Type.Put); + + HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig); + // TODO We need a way to do this without creating files + File hFileLocation = testFolder.newFile(); + FSDataOutputStream out = new FSDataOutputStream(new FileOutputStream(hFileLocation), null); + try { + hFileFactory.withOutputStream(out); + hFileFactory.withFileContext(new HFileContextBuilder().build()); + HFile.Writer writer = hFileFactory.create(); + try { + writer.append(new KeyValue(cellBuilder.build())); + } finally { + writer.close(); + } + } finally { + out.close(); + } + return hFileLocation.getAbsoluteFile().getAbsolutePath(); + } + + private void copyToHdfsForNoRepFamily(String bulkLoadFilePath, MiniDFSCluster cluster) + throws Exception { + Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/"); + cluster.getFileSystem().mkdirs(bulkLoadDir); + cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir); + } + + private void assertTableNotHasValue(Table table, byte[] row, byte[] value) throws IOException { + Get get = new Get(row); + Result result = table.get(get); + assertNotEquals(Bytes.toString(value), Bytes.toString(result.value())); + } }