This is an automated email from the ASF dual-hosted git repository.

cconnell pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 376d8fd49c2 HBASE-29796 Allow sleepForRetry replication config to be 
overridden by replication peers (#7577)
376d8fd49c2 is described below

commit 376d8fd49c2822ec1b9dd5e4634ad0885cfed24b
Author: Siddharth Khillon <[email protected]>
AuthorDate: Tue Jan 6 13:25:45 2026 -0800

    HBASE-29796 Allow sleepForRetry replication config to be overridden by 
replication peers (#7577)
    
    Co-authored-by: skhillon <[email protected]>
    Signed-off by: <[email protected]>
---
 .../regionserver/PeerProcedureHandlerImpl.java     |  29 +++++
 .../regionserver/ReplicationSource.java            |   4 +
 .../regionserver/ReplicationSourceManager.java     |  14 ++-
 .../regionserver/ReplicationSourceShipper.java     |   4 +
 .../regionserver/ReplicationSourceWALReader.java   |   4 +
 .../regionserver/TestPeerProcedureHandlerImpl.java | 130 +++++++++++++++++++++
 .../regionserver/TestReplicationSourceManager.java | 100 ++++++++++++++++
 7 files changed, 284 insertions(+), 1 deletion(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
index cd3db44d8fa..170eef36e0d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/PeerProcedureHandlerImpl.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.LogRoller;
@@ -108,6 +109,33 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
     refreshPeerState(peerId);
   }
 
+  private boolean hasReplicationConfigChange(ReplicationPeerConfig oldConfig,
+    ReplicationPeerConfig newConfig) {
+    Map<String, String> oldReplicationConfigs = oldConfig.getConfiguration();
+    Map<String, String> newReplicationConfigs = newConfig.getConfiguration();
+
+    // Check if any replication.source.* keys have changed values
+    for (Map.Entry<String, String> entry : newReplicationConfigs.entrySet()) {
+      String key = entry.getKey();
+      if (key.startsWith("replication.source.")) {
+        String oldValue = oldReplicationConfigs.get(key);
+        String newValue = entry.getValue();
+        if (!newValue.equals(oldValue)) {
+          return true;
+        }
+      }
+    }
+
+    // Check if any replication.source.* keys were removed
+    for (String key : oldReplicationConfigs.keySet()) {
+      if (key.startsWith("replication.source.") && 
!newReplicationConfigs.containsKey(key)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
   @Override
   public void updatePeerConfig(String peerId) throws ReplicationException, 
IOException {
     Lock peerLock = peersLock.acquireLock(peerId);
@@ -131,6 +159,7 @@ public class PeerProcedureHandlerImpl implements 
PeerProcedureHandler {
       if (
         !ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig)
           || oldConfig.isSerial() != newConfig.isSerial()
+          || hasReplicationConfigChange(oldConfig, newConfig)
           || (oldState.equals(PeerState.ENABLED) && 
newState.equals(PeerState.DISABLED))
       ) {
         replicationSourceManager.refreshSources(peerId);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index dc17ed12ff0..4e122ef5e8b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -862,4 +862,8 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
   public long getTotalReplicatedEdits() {
     return totalReplicatedEdits.get();
   }
+
+  long getSleepForRetries() {
+    return sleepForRetries;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index ffaabe7e339..e67c27d86a6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -47,6 +47,7 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
@@ -318,7 +319,18 @@ public class ReplicationSourceManager {
     WALFileLengthProvider walFileLengthProvider = 
this.walFactory.getWALProvider() != null
       ? this.walFactory.getWALProvider().getWALFileLengthProvider()
       : p -> OptionalLong.empty();
-    src.init(conf, fs, this, queueStorage, replicationPeer, server, queueData, 
clusterId,
+
+    // Create merged configuration with peer overrides as higher priority and
+    // global config as lower priority
+    Configuration mergedConf = conf;
+    if (!replicationPeer.getPeerConfig().getConfiguration().isEmpty()) {
+      CompoundConfiguration compound = new CompoundConfiguration();
+      compound.add(conf);
+      
compound.addStringMap(replicationPeer.getPeerConfig().getConfiguration());
+      mergedConf = compound;
+    }
+
+    src.init(mergedConf, fs, this, queueStorage, replicationPeer, server, 
queueData, clusterId,
       walFileLengthProvider, new MetricsSource(queueData.getId().toString()));
     return src;
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 4709e607fc7..d05e4fed045 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -355,4 +355,8 @@ public class ReplicationSourceShipper extends Thread {
         totalReleasedBytes);
     }
   }
+
+  long getSleepForRetries() {
+    return sleepForRetries;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index fe983c9f3ae..e421042614c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -436,4 +436,8 @@ class ReplicationSourceWALReader extends Thread {
   private ReplicationSourceManager getSourceManager() {
     return this.source.getSourceManager();
   }
+
+  long getSleepForRetries() {
+    return sleepForRetries;
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestPeerProcedureHandlerImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestPeerProcedureHandlerImpl.java
new file mode 100644
index 00000000000..5a3f2bd1eeb
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestPeerProcedureHandlerImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag(ReplicationTests.TAG)
+@Tag(SmallTests.TAG)
+public class TestPeerProcedureHandlerImpl {
+
+  private ReplicationSourceManager mockSourceManager;
+  private ReplicationPeers mockReplicationPeers;
+  private ReplicationPeerImpl mockPeer;
+  private PeerProcedureHandlerImpl handler;
+  private static final String PEER_ID = "testPeer";
+
+  @BeforeEach
+  public void setup() throws Exception {
+    mockSourceManager = mock(ReplicationSourceManager.class);
+    mockReplicationPeers = mock(ReplicationPeers.class);
+    mockPeer = mock(ReplicationPeerImpl.class);
+
+    
when(mockSourceManager.getReplicationPeers()).thenReturn(mockReplicationPeers);
+    when(mockReplicationPeers.getPeer(PEER_ID)).thenReturn(mockPeer);
+
+    handler = new PeerProcedureHandlerImpl(mockSourceManager, null);
+  }
+
+  @Test
+  public void testReplicationSourceConfigChangeTriggers() throws Exception {
+    ReplicationPeerConfig oldConfig = 
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+      .putConfiguration("replication.source.sleepforretries", "1000").build();
+
+    ReplicationPeerConfig newConfig = 
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+      .putConfiguration("replication.source.sleepforretries", "5000").build();
+
+    when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
+    when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
+    
when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
+    
when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
+
+    handler.updatePeerConfig(PEER_ID);
+
+    verify(mockSourceManager, times(1)).refreshSources(PEER_ID);
+  }
+
+  @Test
+  public void testNonReplicationSourceConfigDoesNotTrigger() throws Exception {
+    ReplicationPeerConfig oldConfig = 
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+      .putConfiguration("some.other.config", "value1").build();
+
+    ReplicationPeerConfig newConfig = 
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+      .putConfiguration("some.other.config", "value2").build();
+
+    when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
+    when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
+    
when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
+    
when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
+
+    handler.updatePeerConfig(PEER_ID);
+
+    verify(mockSourceManager, never()).refreshSources(anyString());
+  }
+
+  @Test
+  public void testNewReplicationSourceConfigTriggers() throws Exception {
+    ReplicationPeerConfig oldConfig =
+      ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster").build();
+
+    ReplicationPeerConfig newConfig = 
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+      .putConfiguration("replication.source.sleepforretries", "5000").build();
+
+    when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
+    when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
+    
when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
+    
when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
+
+    handler.updatePeerConfig(PEER_ID);
+
+    verify(mockSourceManager, times(1)).refreshSources(PEER_ID);
+  }
+
+  @Test
+  public void testRemovedReplicationSourceConfigTriggers() throws Exception {
+    ReplicationPeerConfig oldConfig = 
ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster")
+      .putConfiguration("replication.source.sleepforretries", "2000").build();
+
+    ReplicationPeerConfig newConfig =
+      ReplicationPeerConfig.newBuilder().setClusterKey("oldCluster").build();
+
+    when(mockPeer.getPeerConfig()).thenReturn(oldConfig);
+    when(mockPeer.getPeerState()).thenReturn(PeerState.ENABLED);
+    
when(mockReplicationPeers.refreshPeerConfig(PEER_ID)).thenReturn(newConfig);
+    
when(mockReplicationPeers.refreshPeerState(PEER_ID)).thenReturn(PeerState.ENABLED);
+
+    handler.updatePeerConfig(PEER_ID);
+
+    verify(mockSourceManager, times(1)).refreshSources(PEER_ID);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 663b444dc4e..aadf5982b68 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
@@ -373,4 +374,103 @@ public class TestReplicationSourceManager {
     manager.cleanOldLogs(walName, true, source);
     assertFalse(FS.exists(remoteWAL));
   }
+
+  @Test
+  public void testPeerConfigurationOverridesPropagate() throws Exception {
+    Configuration globalConf = UTIL.getConfiguration();
+    long globalSleepValue = 1000L;
+    globalConf.setLong("replication.source.sleepforretries", globalSleepValue);
+
+    long peerSleepOverride = 5000L;
+    String peerId = "testConfigOverridePeer";
+    String clusterKey = "testPeerConfigOverride";
+
+    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + 
clusterKey)
+      .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName())
+      .putConfiguration("replication.source.sleepforretries", 
String.valueOf(peerSleepOverride))
+      .build();
+
+    manager.getReplicationPeers().getPeerStorage().addPeer(peerId, peerConfig, 
true,
+      SyncReplicationState.NONE);
+    manager.addPeer(peerId);
+    UTIL.waitFor(20000, () -> {
+      ReplicationSourceInterface rs = manager.getSource(peerId);
+      return rs != null && rs.isSourceActive();
+    });
+
+    ReplicationSource source = (ReplicationSource) 
manager.getSources().stream()
+      .filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null);
+    assertNotNull("Source should be created for peer", source);
+
+    assertEquals("ReplicationSource should use peer config override for 
sleepForRetries",
+      peerSleepOverride, source.getSleepForRetries());
+
+    Map<String, ReplicationSourceShipper> workers = source.workerThreads;
+    if (!workers.isEmpty()) {
+      ReplicationSourceShipper shipper = workers.values().iterator().next();
+      assertEquals("ReplicationSourceShipper should use peer config override 
for sleepForRetries",
+        peerSleepOverride, shipper.getSleepForRetries());
+
+      ReplicationSourceWALReader reader = shipper.entryReader;
+      if (reader != null) {
+        assertEquals(
+          "ReplicationSourceWALReader should use peer config override for 
sleepForRetries",
+          peerSleepOverride, reader.getSleepForRetries());
+      }
+    }
+
+    removePeerAndWait(peerId);
+  }
+
+  @Test
+  public void testPeerConfigurationIsolation() throws Exception {
+    Configuration globalConf = UTIL.getConfiguration();
+    long globalSleepValue = 1000L;
+    globalConf.setLong("replication.source.sleepforretries", globalSleepValue);
+
+    // Create first peer WITH config override
+    long peerSleepOverride = 5000L;
+    String peerIdWithOverride = "peerWithOverride";
+    String clusterKeyWithOverride = "testPeerWithOverride";
+
+    ReplicationPeerConfig configWithOverride = 
ReplicationPeerConfig.newBuilder()
+      .setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + 
clusterKeyWithOverride)
+      .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName())
+      .putConfiguration("replication.source.sleepforretries", 
String.valueOf(peerSleepOverride))
+      .build();
+
+    manager.getReplicationPeers().getPeerStorage().addPeer(peerIdWithOverride, 
configWithOverride,
+      true, SyncReplicationState.NONE);
+    manager.addPeer(peerIdWithOverride);
+
+    // Create second peer WITHOUT config override
+    String peerIdWithoutOverride = "peerWithoutOverride";
+    String clusterKeyWithoutOverride = "testPeerWithoutOverride";
+    addPeerAndWait(peerIdWithoutOverride, clusterKeyWithoutOverride, false);
+
+    // Wait for both peers to be active
+    UTIL.waitFor(20000, () -> {
+      ReplicationSourceInterface rs1 = manager.getSource(peerIdWithOverride);
+      ReplicationSourceInterface rs2 = 
manager.getSource(peerIdWithoutOverride);
+      return rs1 != null && rs1.isSourceActive() && rs2 != null && 
rs2.isSourceActive();
+    });
+
+    // Verify peer with override uses the override value
+    ReplicationSource sourceWithOverride = (ReplicationSource) 
manager.getSources().stream()
+      .filter(s -> 
s.getPeerId().equals(peerIdWithOverride)).findFirst().orElse(null);
+    assertNotNull("Source with override should be created", 
sourceWithOverride);
+    assertEquals("Peer with override should use override value", 
peerSleepOverride,
+      sourceWithOverride.getSleepForRetries());
+
+    // Verify peer without override uses global config
+    ReplicationSource sourceWithoutOverride = (ReplicationSource) 
manager.getSources().stream()
+      .filter(s -> 
s.getPeerId().equals(peerIdWithoutOverride)).findFirst().orElse(null);
+    assertNotNull("Source without override should be created", 
sourceWithoutOverride);
+    assertEquals("Peer without override should use global config", 
globalSleepValue,
+      sourceWithoutOverride.getSleepForRetries());
+
+    removePeerAndWait(peerIdWithOverride);
+    removePeerAndWait(peerIdWithoutOverride);
+  }
 }

Reply via email to