This is an automated email from the ASF dual-hosted git repository.
cconnell pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 376d8fd49c2 HBASE-29796 Allow sleepForRetry replication config to be
overridden by replication peers (#7577)
376d8fd49c2 is described below
commit 376d8fd49c2822ec1b9dd5e4634ad0885cfed24b
Author: Siddharth Khillon <[email protected]>
AuthorDate: Tue Jan 6 13:25:45 2026 -0800
HBASE-29796 Allow sleepForRetry replication config to be overridden by
replication peers (#7577)
Co-authored-by: skhillon <[email protected]>
Signed-off by: <[email protected]>
---
.../regionserver/PeerProcedureHandlerImpl.java | 29 +++++
.../regionserver/ReplicationSource.java | 4 +
.../regionserver/ReplicationSourceManager.java | 14 ++-
.../regionserver/ReplicationSourceShipper.java | 4 +
.../regionserver/ReplicationSourceWALReader.java | 4 +
.../regionserver/TestPeerProcedureHandlerImpl.java | 130 +++++++++++++++++++++
.../regionserver/TestReplicationSourceManager.java | 100 ++++++++++++++++
7 files changed, 284 insertions(+), 1 deletion(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index cd3db44d8fa..170eef36e0d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.Map;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.LogRoller;
@@ -108,6 +109,33 @@ public class PeerProcedureHandlerImpl implements
PeerProcedureHandler {
refreshPeerState(peerId);
}
+ private boolean hasReplicationConfigChange(ReplicationPeerConfig oldConfig,
+ ReplicationPeerConfig newConfig) {
+ Map<String, String> oldReplicationConfigs = oldConfig.getConfiguration();
+ Map<String, String> newReplicationConfigs = newConfig.getConfiguration();
+
+ // Check if any replication.source.* keys have changed values
+ for (Map.Entry<String, String> entry : newReplicationConfigs.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith("replication.source.")) {
+ String oldValue = oldReplicationConfigs.get(key);
+ String newValue = entry.getValue();
+ if (!newValue.equals(oldValue)) {
+ return true;
+ }
+ }
+ }
+
+ // Check if any replication.source.* keys were removed
+ for (String key : oldReplicationConfigs.keySet()) {
+ if (key.startsWith("replication.source.") &&
!newReplicationConfigs.containsKey(key)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
@Override
public void updatePeerConfig(String peerId) throws ReplicationException,
IOException {
Lock peerLock = peersLock.acquireLock(peerId);
@@ -131,6 +159,7 @@ public class PeerProcedureHandlerImpl implements
PeerProcedureHandler {
if (
!ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig)
|| oldConfig.isSerial() != newConfig.isSerial()
+ || hasReplicationConfigChange(oldConfig, newConfig)
|| (oldState.equals(PeerState.ENABLED) &&
newState.equals(PeerState.DISABLED))
) {
replicationSourceManager.refreshSources(peerId);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index dc17ed12ff0..4e122ef5e8b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -862,4 +862,8 @@ public class ReplicationSource implements
ReplicationSourceInterface {
public long getTotalReplicatedEdits() {
return totalReplicatedEdits.get();
}
+
+ long getSleepForRetries() {
+ return sleepForRetries;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index ffaabe7e339..e67c27d86a6 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -47,6 +47,7 @@ import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
@@ -318,7 +319,18 @@ public class ReplicationSourceManager {
WALFileLengthProvider walFileLengthProvider =
this.walFactory.getWALProvider() != null
? this.walFactory.getWALProvider().getWALFileLengthProvider()
: p -> OptionalLong.empty();
- src.init(conf, fs, this, queueStorage, replicationPeer, server, queueData,
clusterId,
+
+ // Create merged configuration with peer overrides as higher priority and
+ // global config as lower priority
+ Configuration mergedConf = conf;
+ if (!replicationPeer.getPeerConfig().getConfiguration().isEmpty()) {
+ CompoundConfiguration compound = new CompoundConfiguration();
+ compound.add(conf);
+
compound.addStringMap(replicationPeer.getPeerConfig().getConfiguration());
+ mergedConf = compound;
+ }
+
+ src.init(mergedConf, fs, this, queueStorage, replicationPeer, server,
queueData, clusterId,
walFileLengthProvider, new MetricsSource(queueData.getId().toString()));
return src;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 4709e607fc7..d05e4fed045 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -355,4 +355,8 @@ public class ReplicationSourceShipper extends Thread {
totalReleasedBytes);
}
}
+
+ long getSleepForRetries() {
+ return sleepForRetries;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index fe983c9f3ae..e421042614c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -436,4 +436,8 @@ class ReplicationSourceWALReader extends Thread {
private ReplicationSourceManager getSourceManager() {
return this.source.getSourceManager();
}
+
+ long getSleepForRetries() {
+ return sleepForRetries;
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestPeerProcedureHandlerImpl.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestPeerProcedureHandlerImpl.java
new file mode 100644
index 00000000000..5a3f2bd1eeb
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestPeerProcedureHandlerImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag(ReplicationTests.TAG)
+@Tag(SmallTests.TAG)
+public class TestPeerProcedureHandlerImpl {
+
+ private ReplicationSourceManager mockSourceManager;
+ private ReplicationPeers mockReplicationPeers;
+ private ReplicationPeerImpl mockPeer;
+ private PeerProcedureHandlerImpl handler;
+ private static final String PEER_ID = "testPeer";
+
+ @BeforeEach
+ public void setup() throws Exception {
+ mockSourceManager = mock(ReplicationSourceManager.class);
+ mockReplicationPeers = mock(ReplicationPeers.class);
+ mockPeer = mock(ReplicationPeerImpl.class);
+
+
when(mockSourceManager.getReplicationPeers()).thenReturn(mockReplicationPeers);
+ when(mockReplicationPeers.getPeer(PEER_ID)).thenReturn(mockPeer);
+
+ handler = new PeerProcedureHandlerImpl(mockSourceManager, null);
+ }
+
+ @Test
+ public void testReplicationSourceConfigChangeTriggers() throws Exception {
+ ReplicationPeerConfig oldConfig =
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+ .putConfiguration("replication.source.sleepforretries", "1000").build();
+
+ ReplicationPeerConfig newConfig =
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+ .putConfiguration("replication.source.sleepforretries", "5000").build();
+
+ when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
+ when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
+
when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
+
when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
+
+ handler.updatePeerConfig(PEER_ID);
+
+ verify(mockSourceManager, times(1)).refreshSources(PEER_ID);
+ }
+
+ @Test
+ public void testNonReplicationSourceConfigDoesNotTrigger() throws Exception {
+ ReplicationPeerConfig oldConfig =
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+ .putConfiguration("some.other.config", "value1").build();
+
+ ReplicationPeerConfig newConfig =
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+ .putConfiguration("some.other.config", "value2").build();
+
+ when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
+ when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
+
when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
+
when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
+
+ handler.updatePeerConfig(PEER_ID);
+
+ verify(mockSourceManager, never()).refreshSources(anyString());
+ }
+
+ @Test
+ public void testNewReplicationSourceConfigTriggers() throws Exception {
+ ReplicationPeerConfig oldConfig =
+ ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster").build();
+
+ ReplicationPeerConfig newConfig =
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+ .putConfiguration("replication.source.sleepforretries", "5000").build();
+
+ when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
+ when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
+
when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
+
when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
+
+ handler.updatePeerConfig(PEER_ID);
+
+ verify(mockSourceManager, times(1)).refreshSources(PEER_ID);
+ }
+
+ @Test
+ public void testRemovedReplicationSourceConfigTriggers() throws Exception {
+ ReplicationPeerConfig oldConfig =
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+ .putConfiguration("replication.source.sleepforretries", "2000").build();
+
+ ReplicationPeerConfig newConfig =
+ ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster").build();
+
+ when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
+ when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
+
when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
+
when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
+
+ handler.updatePeerConfig(PEER_ID);
+
+ verify(mockSourceManager, times(1)).refreshSources(PEER_ID);
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 663b444dc4e..aadf5982b68 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
+import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
@@ -373,4 +374,103 @@ public class TestReplicationSourceManager {
manager.cleanOldLogs(walName, true, source);
assertFalse(FS.exists(remoteWAL));
}
+
+ @Test
+ public void testPeerConfigurationOverridesPropagate() throws Exception {
+ Configuration globalConf = UTIL.getConfiguration();
+ long globalSleepValue = 1000L;
+ globalConf.setLong("replication.source.sleepforretries", globalSleepValue);
+
+ long peerSleepOverride = 5000L;
+ String peerId = "testConfigOverridePeer";
+ String clusterKey = "testPeerConfigOverride";
+
+ ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+ .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" +
clusterKey)
+ .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName())
+ .putConfiguration("replication.source.sleepforretries",
String.valueOf(peerSleepOverride))
+ .build();
+
+ manager.getReplicationPeers().getPeerStorage().addPeer(peerId, peerConfig,
true,
+ SyncReplicationState.NONE);
+ manager.addPeer(peerId);
+ UTIL.waitFor(20000, () -> {
+ ReplicationSourceInterface rs = manager.getSource(peerId);
+ return rs != null && rs.isSourceActive();
+ });
+
+ ReplicationSource source = (ReplicationSource)
manager.getSources().stream()
+ .filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null);
+ assertNotNull("Source should be created for peer", source);
+
+ assertEquals("ReplicationSource should use peer config override for
sleepForRetries",
+ peerSleepOverride, source.getSleepForRetries());
+
+ Map<String, ReplicationSourceShipper> workers = source.workerThreads;
+ if (!workers.isEmpty()) {
+ ReplicationSourceShipper shipper = workers.values().iterator().next();
+ assertEquals("ReplicationSourceShipper should use peer config override
for sleepForRetries",
+ peerSleepOverride, shipper.getSleepForRetries());
+
+ ReplicationSourceWALReader reader = shipper.entryReader;
+ if (reader != null) {
+ assertEquals(
+ "ReplicationSourceWALReader should use peer config override for
sleepForRetries",
+ peerSleepOverride, reader.getSleepForRetries());
+ }
+ }
+
+ removePeerAndWait(peerId);
+ }
+
+ @Test
+ public void testPeerConfigurationIsolation() throws Exception {
+ Configuration globalConf = UTIL.getConfiguration();
+ long globalSleepValue = 1000L;
+ globalConf.setLong("replication.source.sleepforretries", globalSleepValue);
+
+ // Create first peer WITH config override
+ long peerSleepOverride = 5000L;
+ String peerIdWithOverride = "peerWithOverride";
+ String clusterKeyWithOverride = "testPeerWithOverride";
+
+ ReplicationPeerConfig configWithOverride =
ReplicationPeerConfig.newBuilder()
+ .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" +
clusterKeyWithOverride)
+ .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName())
+ .putConfiguration("replication.source.sleepforretries",
String.valueOf(peerSleepOverride))
+ .build();
+
+ manager.getReplicationPeers().getPeerStorage().addPeer(peerIdWithOverride,
configWithOverride,
+ true, SyncReplicationState.NONE);
+ manager.addPeer(peerIdWithOverride);
+
+ // Create second peer WITHOUT config override
+ String peerIdWithoutOverride = "peerWithoutOverride";
+ String clusterKeyWithoutOverride = "testPeerWithoutOverride";
+ addPeerAndWait(peerIdWithoutOverride, clusterKeyWithoutOverride, false);
+
+ // Wait for both peers to be active
+ UTIL.waitFor(20000, () -> {
+ ReplicationSourceInterface rs1 = manager.getSource(peerIdWithOverride);
+ ReplicationSourceInterface rs2 =
manager.getSource(peerIdWithoutOverride);
+ return rs1 != null && rs1.isSourceActive() && rs2 != null &&
rs2.isSourceActive();
+ });
+
+ // Verify peer with override uses the override value
+ ReplicationSource sourceWithOverride = (ReplicationSource)
manager.getSources().stream()
+ .filter(s ->
s.getPeerId().equals(peerIdWithOverride)).findFirst().orElse(null);
+ assertNotNull("Source with override should be created",
sourceWithOverride);
+ assertEquals("Peer with override should use override value",
peerSleepOverride,
+ sourceWithOverride.getSleepForRetries());
+
+ // Verify peer without override uses global config
+ ReplicationSource sourceWithoutOverride = (ReplicationSource)
manager.getSources().stream()
+ .filter(s ->
s.getPeerId().equals(peerIdWithoutOverride)).findFirst().orElse(null);
+ assertNotNull("Source without override should be created",
sourceWithoutOverride);
+ assertEquals("Peer without override should use global config",
globalSleepValue,
+ sourceWithoutOverride.getSleepForRetries());
+
+ removePeerAndWait(peerIdWithOverride);
+ removePeerAndWait(peerIdWithoutOverride);
+ }
}