This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 035ca323c46 HBASE-22335 do add hfile ref only when replication_scope
is 1 (#7115)
035ca323c46 is described below
commit 035ca323c4652606e989a8a7bc745660fab1762d
Author: Chandra Sekhar K <[email protected]>
AuthorDate: Tue Jul 1 16:15:06 2025 +0530
HBASE-22335 do add hfile ref only when replication_scope is 1 (#7115)
Signed-off-by: Duo Zhang <[email protected]>
(cherry picked from commit ddf9566411c92802b32c4c6420a5cea5f8f5e542)
---
.../regionserver/ReplicationObserver.java | 16 ++-
.../regionserver/TestBulkLoadReplication.java | 138 +++++++++++++++++----
2 files changed, 127 insertions(+), 27 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
index 541021f4d5d..eec0f92d196 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationObserver.java
@@ -23,6 +23,7 @@ import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -31,6 +32,7 @@ import
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -65,11 +67,23 @@ public class ReplicationObserver implements
RegionCoprocessor, RegionObserver {
+ "data replication.");
return;
}
+ TableName tableName = env.getRegionInfo().getTable();
+ if (
+ env.getRegion().getTableDescriptor().getColumnFamily(family).getScope()
+ != HConstants.REPLICATION_SCOPE_GLOBAL
+ ) {
+ LOG.debug(
+ "Skipping recording bulk load entries in preCommitStoreFile for
table:{}, family:{}, Because the replication is not enabled",
+ tableName, Bytes.toString(family));
+ return;
+ }
+
// This is completely cheating AND getting a HRegionServer from a
RegionServerEnvironment is
// just going to break. This is all private. Not allowed. Regions
shouldn't assume they are
// hosted in a RegionServer. TODO: fix.
RegionServerServices rss = ((HasRegionServerServices)
env).getRegionServerServices();
Replication rep = (Replication) ((HRegionServer)
rss).getReplicationSourceService();
- rep.addHFileRefsToQueue(env.getRegionInfo().getTable(), family, pairs);
+
+ rep.addHFileRefsToQueue(tableName, family, pairs);
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
index cd8c046767f..9e3759311c3 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -38,6 +41,8 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.ExtendedCellBuilder;
+import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@@ -59,6 +64,8 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -66,7 +73,6 @@ import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -118,6 +124,10 @@ public class TestBulkLoadReplication extends
TestReplicationBase {
@ClassRule
public static TemporaryFolder testFolder = new TemporaryFolder();
+ private static ReplicationQueueStorage queueStorage;
+
+ private static boolean replicationPeersAdded = false;
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
@@ -126,6 +136,8 @@ public class TestBulkLoadReplication extends
TestReplicationBase {
setupConfig(UTIL3, "/3");
TestReplicationBase.setUpBeforeClass();
startThirdCluster();
+ queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getZooKeeperWatcher(),
+ UTIL1.getConfiguration());
}
private static void startThirdCluster() throws Exception {
@@ -148,22 +160,28 @@ public class TestBulkLoadReplication extends
TestReplicationBase {
@Before
@Override
public void setUpBase() throws Exception {
- // "super.setUpBase()" already sets replication from 1->2,
- // then on the subsequent lines, sets 2->1, 2->3 and 3->2.
- // So we have following topology: "1 <-> 2 <->3"
- super.setUpBase();
- ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
- ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
- ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
- // adds cluster1 as a remote peer on cluster2
- UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
- // adds cluster3 as a remote peer on cluster2
- UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
- // adds cluster2 as a remote peer on cluster3
- UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
- setupCoprocessor(UTIL1);
- setupCoprocessor(UTIL2);
- setupCoprocessor(UTIL3);
+ // removing the peer and adding again causing the previously completed
bulk load jobs getting
+ // submitted again, adding a check to add the peers only once.
+ if (!replicationPeersAdded) {
+ // "super.setUpBase()" already sets replication from 1->2,
+ // then on the subsequent lines, sets 2->1, 2->3 and 3->2.
+ // So we have following topology: "1 <-> 2 <->3"
+ super.setUpBase();
+ ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
+ ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
+ ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
+ // adds cluster1 as a remote peer on cluster2
+ UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
+ // adds cluster3 as a remote peer on cluster2
+ UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
+ // adds cluster2 as a remote peer on cluster3
+ UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
+ setupCoprocessor(UTIL1);
+ setupCoprocessor(UTIL2);
+ setupCoprocessor(UTIL3);
+ replicationPeersAdded = true;
+ }
+
BULK_LOADS_COUNT = new AtomicInteger(0);
}
@@ -190,15 +208,6 @@ public class TestBulkLoadReplication extends
TestReplicationBase {
});
}
- @After
- @Override
- public void tearDownBase() throws Exception {
- super.tearDownBase();
- UTIL2.getAdmin().removeReplicationPeer(PEER_ID1);
- UTIL2.getAdmin().removeReplicationPeer(PEER_ID3);
- UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
- }
-
protected static void setupBulkLoadConfigsForCluster(Configuration config,
String clusterReplicationId) throws Exception {
config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
@@ -317,4 +326,81 @@ public class TestBulkLoadReplication extends
TestReplicationBase {
});
}
}
+
+ @Test
+ public void testBulkloadReplicationActiveActiveForNoRepFamily() throws
Exception {
+ Table peer1TestTable =
UTIL1.getConnection().getTable(TestReplicationBase.tableName);
+ Table peer2TestTable =
UTIL2.getConnection().getTable(TestReplicationBase.tableName);
+ Table peer3TestTable =
UTIL3.getConnection().getTable(TestReplicationBase.tableName);
+ byte[] row = Bytes.toBytes("004");
+ byte[] value = Bytes.toBytes("v4");
+ assertBulkLoadConditionsForNoRepFamily(row, value, UTIL1, peer1TestTable,
peer2TestTable,
+ peer3TestTable);
+ // additional wait to make sure no extra bulk load happens
+ Thread.sleep(400);
+ assertEquals(1, BULK_LOADS_COUNT.get());
+ assertEquals(0, queueStorage.getAllHFileRefs().size());
+ }
+
+ private void assertBulkLoadConditionsForNoRepFamily(byte[] row, byte[] value,
+ HBaseTestingUtility utility, Table... tables) throws Exception {
+ BULK_LOAD_LATCH = new CountDownLatch(1);
+ bulkLoadOnClusterForNoRepFamily(row, value, utility);
+ assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
+ assertTableHasValue(tables[0], row, value);
+ assertTableNotHasValue(tables[1], row, value);
+ assertTableNotHasValue(tables[2], row, value);
+ }
+
+ private void bulkLoadOnClusterForNoRepFamily(byte[] row, byte[] value,
+ HBaseTestingUtility cluster) throws Exception {
+ String bulkloadFile = createHFileForNoRepFamilies(row, value,
cluster.getConfiguration());
+ Path bulkLoadFilePath = new Path(bulkloadFile);
+ copyToHdfsForNoRepFamily(bulkloadFile, cluster.getDFSCluster());
+ BulkLoadHFilesTool bulkLoadHFilesTool = new
BulkLoadHFilesTool(cluster.getConfiguration());
+ Map<byte[], List<Path>> family2Files = new HashMap<>();
+ List<Path> files = new ArrayList<>();
+ files.add(new Path(
+ BULK_LOAD_BASE_DIR + "/" + Bytes.toString(noRepfamName) + "/" +
bulkLoadFilePath.getName()));
+ family2Files.put(noRepfamName, files);
+ bulkLoadHFilesTool.bulkLoad(tableName, family2Files);
+ }
+
+ private String createHFileForNoRepFamilies(byte[] row, byte[] value,
Configuration clusterConfig)
+ throws IOException {
+ ExtendedCellBuilder cellBuilder =
ExtendedCellBuilderFactory.create(CellBuilderType.DEEP_COPY);
+ cellBuilder.setRow(row).setFamily(TestReplicationBase.noRepfamName)
+ .setQualifier(Bytes.toBytes("1")).setValue(value).setType(Cell.Type.Put);
+
+ HFile.WriterFactory hFileFactory =
HFile.getWriterFactoryNoCache(clusterConfig);
+ // TODO We need a way to do this without creating files
+ File hFileLocation = testFolder.newFile();
+ FSDataOutputStream out = new FSDataOutputStream(new
FileOutputStream(hFileLocation), null);
+ try {
+ hFileFactory.withOutputStream(out);
+ hFileFactory.withFileContext(new HFileContextBuilder().build());
+ HFile.Writer writer = hFileFactory.create();
+ try {
+ writer.append(new KeyValue(cellBuilder.build()));
+ } finally {
+ writer.close();
+ }
+ } finally {
+ out.close();
+ }
+ return hFileLocation.getAbsoluteFile().getAbsolutePath();
+ }
+
+ private void copyToHdfsForNoRepFamily(String bulkLoadFilePath,
MiniDFSCluster cluster)
+ throws Exception {
+ Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/" +
Bytes.toString(noRepfamName) + "/");
+ cluster.getFileSystem().mkdirs(bulkLoadDir);
+ cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath),
bulkLoadDir);
+ }
+
+ private void assertTableNotHasValue(Table table, byte[] row, byte[] value)
throws IOException {
+ Get get = new Get(row);
+ Result result = table.get(get);
+ assertNotEquals(Bytes.toString(value), Bytes.toString(result.value()));
+ }
}