This is an automated email from the ASF dual-hosted git repository.

cconnell pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 0715313e81e HBASE-29796 [branch-2] Allow sleepForRetry replication 
config to be overridden by replication peers (#7578)
0715313e81e is described below

commit 0715313e81e7c3f5d296418cdea1d44717a931a0
Author: Siddharth Khillon <[email protected]>
AuthorDate: Tue Jan 6 13:27:15 2026 -0800

    HBASE-29796 [branch-2] Allow sleepForRetry replication config to be 
overridden by replication peers (#7578)
    
    Co-authored-by: skhillon <[email protected]>
    Signed-off by: <[email protected]>
---
 .../regionserver/PeerProcedureHandlerImpl.java     |  29 +++++
 .../regionserver/ReplicationSource.java            |   4 +
 .../regionserver/ReplicationSourceManager.java     |  14 ++-
 .../regionserver/ReplicationSourceShipper.java     |   4 +
 .../regionserver/ReplicationSourceWALReader.java   |   4 +
 .../regionserver/TestPeerProcedureHandlerImpl.java | 130 +++++++++++++++++++++
 .../regionserver/TestReplicationSourceManager.java | 120 +++++++++++++++++++
 7 files changed, 304 insertions(+), 1 deletion(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index 429276806f1..b2d5d3e6dea 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
+import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.replication.ReplicationException;
@@ -98,6 +99,33 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
     refreshPeerState(peerId);
   }
 
+  private boolean hasReplicationConfigChange(ReplicationPeerConfig oldConfig,
+    ReplicationPeerConfig newConfig) {
+    Map<String, String> oldReplicationConfigs = oldConfig.getConfiguration();
+    Map<String, String> newReplicationConfigs = newConfig.getConfiguration();
+
+    // Check if any replication.source.* keys have changed values
+    for (Map.Entry<String, String> entry : newReplicationConfigs.entrySet()) {
+      String key = entry.getKey();
+      if (key.startsWith("replication.source.")) {
+        String oldValue = oldReplicationConfigs.get(key);
+        String newValue = entry.getValue();
+        if (!newValue.equals(oldValue)) {
+          return true;
+        }
+      }
+    }
+
+    // Check if any replication.source.* keys were removed
+    for (String key : oldReplicationConfigs.keySet()) {
+      if (key.startsWith("replication.source.") && 
!newReplicationConfigs.containsKey(key)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
   @Override
   public void updatePeerConfig(String peerId) throws ReplicationException, 
IOException {
     Lock peerLock = peersLock.acquireLock(peerId);
@@ -121,6 +149,7 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
       if (
         !ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig)
           || oldConfig.isSerial() != newConfig.isSerial()
+          || hasReplicationConfigChange(oldConfig, newConfig)
           || (oldState.equals(PeerState.ENABLED) && 
newState.equals(PeerState.DISABLED))
       ) {
         replicationSourceManager.refreshSources(peerId);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 2ce15512554..dc5b10628e1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -835,4 +835,8 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   public long getTotalReplicatedEdits() {
     return totalReplicatedEdits.get();
   }
+
+  long getSleepForRetries() {
+    return sleepForRetries;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 35c217940ee..ba799831c28 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
@@ -316,7 +317,18 @@ public class ReplicationSourceManager {
     WALFileLengthProvider walFileLengthProvider = 
this.walFactory.getWALProvider() != null
       ? this.walFactory.getWALProvider().getWALFileLengthProvider()
       : p -> OptionalLong.empty();
-    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, 
clusterId,
+
+    // Create merged configuration with peer overrides as higher priority and
+    // global config as lower priority
+    Configuration mergedConf = conf;
+    if (!replicationPeer.getPeerConfig().getConfiguration().isEmpty()) {
+      CompoundConfiguration compound = new CompoundConfiguration();
+      compound.add(conf);
+      
compound.addStringMap(replicationPeer.getPeerConfig().getConfiguration());
+      mergedConf = compound;
+    }
+
+    src.init(mergedConf, fs, this, queueStorage, replicationPeer, server, 
queueId, clusterId,
       walFileLengthProvider, new MetricsSource(queueId));
     return src;
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 746c845908f..0efab7ef18d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -370,4 +370,8 @@ public class ReplicationSourceShipper extends Thread {
         totalReleasedBytes);
     }
   }
+
+  long getSleepForRetries() {
+    return sleepForRetries;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 26360cbe3ea..e617fe6d016 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -440,4 +440,8 @@ class ReplicationSourceWALReader extends Thread {
   private ReplicationSourceManager getSourceManager() {
     return this.source.getSourceManager();
   }
+
+  long getSleepForRetries() {
+    return sleepForRetries;
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestPeerProcedureHandlerImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestPeerProcedureHandlerImpl.java
new file mode 100644
index 00000000000..193d1123c0e
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestPeerProcedureHandlerImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag(ReplicationTests.TAG)
+@Tag(SmallTests.TAG)
+public class TestPeerProcedureHandlerImpl {
+
+  private ReplicationSourceManager mockSourceManager;
+  private ReplicationPeers mockReplicationPeers;
+  private ReplicationPeerImpl mockPeer;
+  private PeerProcedureHandlerImpl handler;
+  private static final String PEER_ID = "testPeer";
+
+  @BeforeEach
+  public void setup() throws Exception {
+    mockSourceManager = mock(ReplicationSourceManager.class);
+    mockReplicationPeers = mock(ReplicationPeers.class);
+    mockPeer = mock(ReplicationPeerImpl.class);
+
+    
when(mockSourceManager.getReplicationPeers()).thenReturn(mockReplicationPeers);
+    when(mockReplicationPeers.getPeer(PEER_ID)).thenReturn(mockPeer);
+
+    handler = new PeerProcedureHandlerImpl(mockSourceManager);
+  }
+
+  @Test
+  public void testReplicationSourceConfigChangeTriggers() throws Exception {
+    ReplicationPeerConfig oldConfig = 
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+      .putConfiguration("replication.source.sleepforretries", "1000").build();
+
+    ReplicationPeerConfig newConfig = 
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+      .putConfiguration("replication.source.sleepforretries", "5000").build();
+
+    when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
+    when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
+    
when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
+    
when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
+
+    handler.updatePeerConfig(PEER_ID);
+
+    verify(mockSourceManager, times(1)).refreshSources(PEER_ID);
+  }
+
+  @Test
+  public void testNonReplicationSourceConfigDoesNotTrigger() throws Exception {
+    ReplicationPeerConfig oldConfig = 
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+      .putConfiguration("some.other.config", "value1").build();
+
+    ReplicationPeerConfig newConfig = 
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+      .putConfiguration("some.other.config", "value2").build();
+
+    when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
+    when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
+    
when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
+    
when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
+
+    handler.updatePeerConfig(PEER_ID);
+
+    verify(mockSourceManager, never()).refreshSources(anyString());
+  }
+
+  @Test
+  public void testNewReplicationSourceConfigTriggers() throws Exception {
+    ReplicationPeerConfig oldConfig =
+      ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster").build();
+
+    ReplicationPeerConfig newConfig = 
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+      .putConfiguration("replication.source.sleepforretries", "5000").build();
+
+    when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
+    when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
+    
when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
+    
when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
+
+    handler.updatePeerConfig(PEER_ID);
+
+    verify(mockSourceManager, times(1)).refreshSources(PEER_ID);
+  }
+
+  @Test
+  public void testRemovedReplicationSourceConfigTriggers() throws Exception {
+    ReplicationPeerConfig oldConfig = 
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+      .putConfiguration("replication.source.sleepforretries", "2000").build();
+
+    ReplicationPeerConfig newConfig =
+      ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster").build();
+
+    when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
+    when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
+    
when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
+    
when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
+
+    handler.updatePeerConfig(PEER_ID);
+
+    verify(mockSourceManager, times(1)).refreshSources(PEER_ID);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index fc5d14c7091..2b5c22ced14 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -667,6 +668,125 @@ public abstract class TestReplicationSourceManager {
     assertTrue(latestWals.contains(walName2));
   }
 
+  @Test
+  public void testPeerConfigurationOverridesPropagate() throws Exception {
+    String replicationSourceImplName = 
conf.get("replication.replicationsource.implementation");
+    String peerId = "testConfigOverridePeer";
+    try {
+      conf.set("replication.replicationsource.implementation", 
ReplicationSource.class.getName());
+
+      Configuration globalConf = utility.getConfiguration();
+      long globalSleepValue = 1000L;
+      globalConf.setLong("replication.source.sleepforretries", 
globalSleepValue);
+
+      long peerSleepOverride = 5000L;
+      String clusterKey = "testPeerConfigOverride";
+
+      ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+        .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/" + 
clusterKey)
+        .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName())
+        .putConfiguration("replication.source.sleepforretries", 
String.valueOf(peerSleepOverride))
+        .build();
+
+      manager.getReplicationPeers().getPeerStorage().addPeer(peerId, 
peerConfig, true);
+      manager.addPeer(peerId);
+      utility.waitFor(20000, () -> {
+        ReplicationSourceInterface rs = manager.getSource(peerId);
+        return rs != null && rs.isSourceActive();
+      });
+
+      ReplicationSource source = (ReplicationSource) 
manager.getSources().stream()
+        .filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null);
+      assertNotNull("Source should be created for peer", source);
+
+      assertEquals("ReplicationSource should use peer config override for 
sleepForRetries",
+        peerSleepOverride, source.getSleepForRetries());
+
+      Map<String, ReplicationSourceShipper> workers = source.workerThreads;
+      if (!workers.isEmpty()) {
+        ReplicationSourceShipper shipper = workers.values().iterator().next();
+        assertEquals("ReplicationSourceShipper should use peer config override 
for sleepForRetries",
+          peerSleepOverride, shipper.getSleepForRetries());
+
+        ReplicationSourceWALReader reader = shipper.entryReader;
+        if (reader != null) {
+          assertEquals(
+            "ReplicationSourceWALReader should use peer config override for 
sleepForRetries",
+            peerSleepOverride, reader.getSleepForRetries());
+        }
+      }
+    } finally {
+      conf.set("replication.replicationsource.implementation", 
replicationSourceImplName);
+      removePeerAndWait(peerId);
+    }
+  }
+
+  @Test
+  public void testPeerConfigurationIsolation() throws Exception {
+    String replicationSourceImplName = 
conf.get("replication.replicationsource.implementation");
+    String peerIdWithOverride = "peerWithOverride";
+    String peerIdWithoutOverride = "peerWithoutOverride";
+    try {
+      conf.set("replication.replicationsource.implementation", 
ReplicationSource.class.getName());
+
+      Configuration globalConf = utility.getConfiguration();
+      long globalSleepValue = 1000L;
+      globalConf.setLong("replication.source.sleepforretries", 
globalSleepValue);
+
+      // Create first peer WITH config override
+      long peerSleepOverride = 5000L;
+      String clusterKeyWithOverride = "testPeerWithOverride";
+
+      ReplicationPeerConfig configWithOverride = 
ReplicationPeerConfig.newBuilder()
+        .setClusterKey(
+          utility.getZkCluster().getAddress().toString() + ":/" + 
clusterKeyWithOverride)
+        .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName())
+        .putConfiguration("replication.source.sleepforretries", 
String.valueOf(peerSleepOverride))
+        .build();
+
+      
manager.getReplicationPeers().getPeerStorage().addPeer(peerIdWithOverride, 
configWithOverride,
+        true);
+      manager.addPeer(peerIdWithOverride);
+
+      // Create second peer WITHOUT config override
+      String clusterKeyWithoutOverride = "testPeerWithoutOverride";
+
+      ReplicationPeerConfig configWithoutOverride = 
ReplicationPeerConfig.newBuilder()
+        .setClusterKey(
+          utility.getZkCluster().getAddress().toString() + ":/" + 
clusterKeyWithoutOverride)
+        
.setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build();
+
+      
manager.getReplicationPeers().getPeerStorage().addPeer(peerIdWithoutOverride,
+        configWithoutOverride, true);
+      manager.addPeer(peerIdWithoutOverride);
+
+      // Wait for both peers to be active
+      utility.waitFor(20000, () -> {
+        ReplicationSourceInterface rs1 = manager.getSource(peerIdWithOverride);
+        ReplicationSourceInterface rs2 = 
manager.getSource(peerIdWithoutOverride);
+        return rs1 != null && rs1.isSourceActive() && rs2 != null && 
rs2.isSourceActive();
+      });
+
+      // Verify peer with override uses the override value
+      ReplicationSource sourceWithOverride = (ReplicationSource) 
manager.getSources().stream()
+        .filter(s -> 
s.getPeerId().equals(peerIdWithOverride)).findFirst().orElse(null);
+      assertNotNull("Source with override should be created", 
sourceWithOverride);
+      assertEquals("Peer with override should use override value", 
peerSleepOverride,
+        sourceWithOverride.getSleepForRetries());
+
+      // Verify peer without override uses global config
+      ReplicationSource sourceWithoutOverride = (ReplicationSource) 
manager.getSources().stream()
+        .filter(s -> 
s.getPeerId().equals(peerIdWithoutOverride)).findFirst().orElse(null);
+      assertNotNull("Source without override should be created", 
sourceWithoutOverride);
+      assertEquals("Peer without override should use global config", 
globalSleepValue,
+        sourceWithoutOverride.getSleepForRetries());
+    } finally {
+      conf.set("replication.replicationsource.implementation", 
replicationSourceImplName);
+      removePeerAndWait(peerIdWithOverride);
+      removePeerAndWait(peerIdWithoutOverride);
+    }
+  }
+
   private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) {
     // 1. Create store files for the families
     Map<byte[], List<Path>> storeFiles = new HashMap<>(1);

Reply via email to