This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new dfc14db550 PHOENIX-7845 ReplicationLogGroup initialization resilience 
to standby cluster unavailability (#2466)
dfc14db550 is described below

commit dfc14db55099b3f8ef19b842760d0b95ade101db
Author: tkhurana <[email protected]>
AuthorDate: Mon May 11 14:39:16 2026 -0700

    PHOENIX-7845 ReplicationLogGroup initialization resilience to standby 
cluster unavailability (#2466)
---
 .../phoenix/hbase/index/IndexRegionObserver.java   |   4 +-
 .../ReplicationLogDiscoveryForwarder.java          |  14 +-
 .../phoenix/replication/ReplicationLogGroup.java   | 150 ++++++++++++++-------
 .../replication/StoreAndForwardModeImpl.java       |   2 +-
 .../replication/SyncAndForwardModeImpl.java        |   3 +-
 .../apache/phoenix/replication/SyncModeImpl.java   |   3 +-
 .../phoenix/replication/ReplicationLogGroupIT.java |   2 +-
 .../replication/ReplicationLogBaseTest.java        |  10 +-
 .../ReplicationLogDiscoveryForwarderTest.java      |  53 ++++++++
 .../replication/ReplicationLogGroupTest.java       |  82 ++++++++++-
 10 files changed, 260 insertions(+), 63 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 2548ee5f98..1ad0579a45 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -683,7 +683,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
    * @return HA group if present or empty if missing
    */
   private Optional<ReplicationLogGroup> 
getHAGroupFromBatch(RegionCoprocessorEnvironment env,
-    MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+    MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
     if (miniBatchOp.size() > 0) {
       Mutation m = miniBatchOp.getOperation(0);
       byte[] haGroupName = 
m.getAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
@@ -702,7 +702,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
    * @return HA group if present or empty if missing
    */
   private Optional<ReplicationLogGroup> 
getHAGroupFromWALKey(RegionCoprocessorEnvironment env,
-    org.apache.hadoop.hbase.wal.WALKey logKey) {
+    org.apache.hadoop.hbase.wal.WALKey logKey) throws IOException {
     byte[] haGroupName =
       
logKey.getExtendedAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
     if (haGroupName != null) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java
index d80994369c..1757421599 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java
@@ -57,6 +57,12 @@ public class ReplicationLogDiscoveryForwarder extends 
ReplicationLogDiscovery {
   public static final String 
REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY =
     "phoenix.replication.forwarder.waiting.buffer.percentage";
 
+  /**
+   * Configuration key for in-progress directory processing probability 
(percentage)
+   */
+  public static final String 
REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY =
+    "phoenix.replication.forwarder.in.progress.processing.probability";
+
   private final ReplicationLogGroup logGroup;
   private final double copyThroughputThresholdBytesPerMs;
   // the timestamp (in future) at which we will attempt to set the HAGroup 
state to SYNC
@@ -134,7 +140,7 @@ public class ReplicationLogDiscoveryForwarder extends 
ReplicationLogDiscovery {
     FileSystem srcFS = replicationLogTracker.getFileSystem();
     FileStatus srcStat = srcFS.getFileStatus(src);
     long ts = replicationLogTracker.getFileTimestamp(srcStat.getPath());
-    ReplicationShardDirectoryManager remoteShardManager = 
logGroup.getPeerShardManager();
+    ReplicationShardDirectoryManager remoteShardManager = 
logGroup.getOrCreatePeerShardManager();
     Path dst = remoteShardManager.getWriterPath(ts, 
logGroup.getServerName().getServerName());
     long startTime = EnvironmentEdgeManager.currentTimeMillis();
     FileUtil.copy(srcFS, srcStat, remoteShardManager.getFileSystem(), dst, 
false, false, conf);
@@ -227,4 +233,10 @@ public class ReplicationLogDiscoveryForwarder extends 
ReplicationLogDiscovery {
     return 
getConf().getDouble(REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY,
       DEFAULT_WAITING_BUFFER_PERCENTAGE);
   }
+
+  @Override
+  public double getInProgressDirectoryProcessProbability() {
+    return 
getConf().getDouble(REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY,
+      super.getInProgressDirectoryProcessProbability());
+  }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index c88d9b2647..ded1fec7b1 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -36,6 +36,7 @@ import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.io.UncheckedIOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
@@ -165,6 +166,9 @@ public class ReplicationLogGroup {
   public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
     "phoenix.replication.log.retry.delay.ms";
   public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
+  public static final String REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY =
+    "phoenix.replication.log.peer.init.timeout.ms";
+  public static final long DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS = 
10_000L;
   public static final String WAL_SYNC_TIMEOUT_MS_KEY = 
"hbase.regionserver.wal.sync.timeout";
   public static final long DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000L;
 
@@ -180,10 +184,15 @@ public class ReplicationLogGroup {
   protected final String haGroupName;
   protected final HAGroupStoreManager haGroupStoreManager;
   protected final MetricsReplicationLogGroupSource metrics;
-  protected ReplicationShardDirectoryManager peerShardManager;
+  // Cached at init time — HDFS URLs (local and peer) are fixed for the 
lifetime of this group.
+  // URL changes require RS restart.
+  protected HAGroupStoreRecord haGroupStoreRecord;
   protected ReplicationShardDirectoryManager localShardManager;
+  protected volatile ReplicationShardDirectoryManager peerShardManager;
+  private CompletableFuture<ReplicationShardDirectoryManager> 
peerShardManagerFuture;
   protected ReplicationLogDiscoveryForwarder logForwarder;
   protected long syncTimeoutMs;
+  protected long peerInitTimeoutMs;
   protected volatile boolean closed = false;
 
   /**
@@ -331,20 +340,24 @@ public class ReplicationLogGroup {
    * @param serverName  The server name
    * @param haGroupName The HA Group name
    * @return ReplicationLogGroup instance
-   * @throws RuntimeException if initialization fails
+   * @throws IOException if initialization fails
    */
   public static ReplicationLogGroup get(Configuration conf, ServerName 
serverName,
-    String haGroupName) {
-    return INSTANCES.computeIfAbsent(haGroupName, k -> {
-      try {
-        ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, 
haGroupName);
-        group.init();
-        return group;
-      } catch (IOException e) {
-        LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", 
haGroupName, e);
-        throw new RuntimeException(e);
-      }
-    });
+    String haGroupName) throws IOException {
+    try {
+      return INSTANCES.computeIfAbsent(haGroupName, k -> {
+        try {
+          ReplicationLogGroup group = new ReplicationLogGroup(conf, 
serverName, haGroupName);
+          group.init();
+          return group;
+        } catch (IOException e) {
+          LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", 
haGroupName, e);
+          throw new UncheckedIOException(e);
+        }
+      });
+    } catch (UncheckedIOException e) {
+      throw e.getCause();
+    }
   }
 
   /**
@@ -354,21 +367,25 @@ public class ReplicationLogGroup {
    * @param haGroupName         The HA Group name
    * @param haGroupStoreManager HA Group Store Manager instance
    * @return ReplicationLogGroup instance
-   * @throws RuntimeException if initialization fails
+   * @throws IOException if initialization fails
    */
   public static ReplicationLogGroup get(Configuration conf, ServerName 
serverName,
-    String haGroupName, HAGroupStoreManager haGroupStoreManager) {
-    return INSTANCES.computeIfAbsent(haGroupName, k -> {
-      try {
-        ReplicationLogGroup group =
-          new ReplicationLogGroup(conf, serverName, haGroupName, 
haGroupStoreManager);
-        group.init();
-        return group;
-      } catch (IOException e) {
-        LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", 
haGroupName, e);
-        throw new RuntimeException(e);
-      }
-    });
+    String haGroupName, HAGroupStoreManager haGroupStoreManager) throws 
IOException {
+    try {
+      return INSTANCES.computeIfAbsent(haGroupName, k -> {
+        try {
+          ReplicationLogGroup group =
+            new ReplicationLogGroup(conf, serverName, haGroupName, 
haGroupStoreManager);
+          group.init();
+          return group;
+        } catch (IOException e) {
+          LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", 
haGroupName, e);
+          throw new UncheckedIOException(e);
+        }
+      });
+    } catch (UncheckedIOException e) {
+      throw e.getCause();
+    }
   }
 
   /**
@@ -421,9 +438,10 @@ public class ReplicationLogGroup {
       throw new IOException(message);
     }
     HAGroupStoreRecord record = haRecord.get();
-    // First initialize the shard managers
-    this.peerShardManager = createPeerShardManager(record);
-    this.localShardManager = createLocalShardManager(record);
+    this.haGroupStoreRecord = record;
+    this.localShardManager = createLocalShardManager();
+    this.peerInitTimeoutMs = 
conf.getLong(REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY,
+      DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS);
     // Initialize the replication log forwarder. The log forwarder is only 
activated when
     // we switch to STORE_AND_FORWARD or SYNC_AND_FORWARD mode
     this.logForwarder = new ReplicationLogDiscoveryForwarder(this);
@@ -783,21 +801,61 @@ public class ReplicationLogGroup {
     }
   }
 
-  /** create shard manager for the standby cluster */
-  protected ReplicationShardDirectoryManager 
createPeerShardManager(HAGroupStoreRecord record)
-    throws IOException {
-    return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR);
+  /** create shard manager for the fallback cluster */
+  protected ReplicationShardDirectoryManager createLocalShardManager() throws 
IOException {
+    return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR);
   }
 
-  /** create shard manager for the fallback cluster */
-  protected ReplicationShardDirectoryManager 
createLocalShardManager(HAGroupStoreRecord record)
-    throws IOException {
-    return createShardManager(record.getHdfsUrl(), FALLBACK_DIR);
+  /**
+   * Get or create the peer shard manager. Thread-safe and idempotent — the 
first successful
+   * creation is cached; subsequent calls return the cached instance. Bounded 
by
+   * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the 
disruptor handler
+   * thread on a peer NN outage.
+   */
+  protected ReplicationShardDirectoryManager getOrCreatePeerShardManager() 
throws IOException {
+    ReplicationShardDirectoryManager cached = peerShardManager;
+    if (cached != null) {
+      return cached;
+    }
+    synchronized (this) {
+      if (peerShardManager != null) {
+        return peerShardManager;
+      }
+      if (peerShardManagerFuture == null || 
peerShardManagerFuture.isCompletedExceptionally()) {
+        // retry
+        peerShardManagerFuture = CompletableFuture.supplyAsync(() -> {
+          try {
+            return createPeerShardManager();
+          } catch (IOException e) {
+            throw new UncheckedIOException(e);
+          }
+        });
+      }
+      try {
+        peerShardManager = peerShardManagerFuture.get(peerInitTimeoutMs, 
TimeUnit.MILLISECONDS);
+        return peerShardManager;
+      } catch (ExecutionException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof UncheckedIOException) {
+          throw ((UncheckedIOException) cause).getCause();
+        }
+        if (cause instanceof IOException) {
+          throw (IOException) cause;
+        }
+        throw new IOException("Failed to create peer shard manager", cause);
+      } catch (TimeoutException e) {
+        throw new IOException("Timed out creating peer shard manager after " + 
peerInitTimeoutMs
+          + "ms for " + haGroupName, e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException("Interrupted while creating peer shard manager", 
e);
+      }
+    }
   }
 
-  /** return shard manager for the standby cluster */
-  protected ReplicationShardDirectoryManager getPeerShardManager() {
-    return peerShardManager;
+  /** Create a new peer shard manager for the standby cluster */
+  protected ReplicationShardDirectoryManager createPeerShardManager() throws 
IOException {
+    return createShardManager(haGroupStoreRecord.getPeerHdfsUrl(), 
STANDBY_DIR);
   }
 
   /** return shard manager for the fallback cluster */
@@ -809,14 +867,10 @@ public class ReplicationLogGroup {
     return FileSystem.get(uri, conf);
   }
 
-  /** Create the standby(synchronous) writer */
-  protected ReplicationLog createStandbyLog() throws IOException {
-    return new ReplicationLog(this, peerShardManager);
-  }
-
-  /** Create the fallback writer */
-  protected ReplicationLog createFallbackLog() throws IOException {
-    return new ReplicationLog(this, localShardManager);
+  /** Create a replication log using the given shard manager */
+  protected ReplicationLog 
createReplicationLog(ReplicationShardDirectoryManager shardManager)
+    throws IOException {
+    return new ReplicationLog(this, shardManager);
   }
 
   /** Returns the log forwarder for this replication group */
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
index ea18a5b853..7bcb4c89e0 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
@@ -55,7 +55,7 @@ public class StoreAndForwardModeImpl extends 
ReplicationModeImpl {
   void onEnter() throws IOException {
     LOG.info("HAGroup {} entered mode {}", logGroup, this);
     // create a log on the fallback cluster
-    log = logGroup.createFallbackLog();
+    log = logGroup.createReplicationLog(logGroup.getLocalShardManager());
     log.init();
     // Schedule task to periodically set the HAGroupStore state to 
ACTIVE_NOT_IN_SYNC
     startHAGroupStoreUpdateTask();
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
index aefce975cf..02c57f7f3f 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
@@ -44,7 +44,8 @@ public class SyncAndForwardModeImpl extends 
ReplicationModeImpl {
   void onEnter() throws IOException {
     LOG.info("HAGroup {} entered mode {}", logGroup, this);
     // create a log on the standby cluster
-    log = logGroup.createStandbyLog();
+    ReplicationShardDirectoryManager peerShardManager = 
logGroup.getOrCreatePeerShardManager();
+    log = logGroup.createReplicationLog(peerShardManager);
     log.init();
     // no-op if the forwarder is already started
     logGroup.getLogForwarder().start();
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
index 75e175ad4e..0a5b5a48c6 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
@@ -43,7 +43,8 @@ public class SyncModeImpl extends ReplicationModeImpl {
   void onEnter() throws IOException {
     LOG.info("HAGroup {} entered mode {}", logGroup, this);
     // create a log on the standby cluster
-    log = logGroup.createStandbyLog();
+    ReplicationShardDirectoryManager peerShardManager = 
logGroup.getOrCreatePeerShardManager();
+    log = logGroup.createReplicationLog(peerShardManager);
     log.init();
   }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
index 949c3f2029..651cddde6c 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -124,7 +124,7 @@ public class ReplicationLogGroupIT extends HABaseIT {
     LogFileAnalyzer analyzer = new LogFileAnalyzer();
     // use peer cluster conf
     analyzer.setConf(conf2);
-    Path standByLogDir = logGroup.getPeerShardManager().getRootDirectoryPath();
+    Path standByLogDir = 
logGroup.getOrCreatePeerShardManager().getRootDirectoryPath();
     LOG.info("Analyzing log files at {}", standByLogDir);
     String[] args = { "--check", standByLogDir.toString() };
     assertEquals(0, analyzer.run(args));
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
index f748e3d1a8..d49e2ffc94 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
@@ -165,13 +165,9 @@ public class ReplicationLogBaseTest {
     }
 
     @Override
-    protected ReplicationLog createStandbyLog() throws IOException {
-      return spy(new TestableLog(this, peerShardManager, useAlignedRotation));
-    }
-
-    @Override
-    protected ReplicationLog createFallbackLog() throws IOException {
-      return spy(new TestableLog(this, localShardManager, useAlignedRotation));
+    protected ReplicationLog 
createReplicationLog(ReplicationShardDirectoryManager shardManager)
+      throws IOException {
+      return spy(new TestableLog(this, shardManager, useAlignedRotation));
     }
 
   }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
index 46efbf196e..65fb764a31 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
 
 import java.io.IOException;
 import java.util.concurrent.Callable;
@@ -179,4 +180,56 @@ public class ReplicationLogDiscoveryForwarderTest extends 
ReplicationLogBaseTest
     }
     assertEquals(SYNC, logGroup.getMode());
   }
+
+  /**
+   * Tests that the forwarder retries peer shard manager creation when the 
peer is initially
+   * unavailable. On the first attempt, getOrCreatePeerShardManager throws; 
the file is marked
+   * failed and retried via in-progress processing. On the retry the peer 
becomes available and
+   * forwarding succeeds.
+   */
+  @Test
+  public void testForwarderRetriesPeerCreation() throws Exception {
+    final String tableName = "TBLFWDRETRY";
+    final long count = 10L;
+
+    // Ensure in-progress files are immediately eligible for retry and always 
processed
+    
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
 0);
+    conf.setDouble(
+      
ReplicationLogDiscoveryForwarder.REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY,
+      100.0);
+    // Recreate the log group with the updated config
+    recreateLogGroup();
+    assertEquals(STORE_AND_FORWARD, logGroup.getMode());
+
+    // Make getOrCreatePeerShardManager fail on the first call, then succeed 
on subsequent calls
+    doThrow(new IOException("Peer namenode 
unavailable")).doCallRealMethod().when(logGroup)
+      .getOrCreatePeerShardManager();
+
+    doAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) {
+        logGroup.setMode(SYNC);
+        try {
+          logGroup.sync();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        return 0L;
+      }
+    }).when(haGroupStoreManager).setHAGroupStatusToSync(haGroupName);
+
+    // Write some data so the forwarder has files to process
+    for (long id = 1; id <= count; ++id) {
+      Mutation put = LogFileTestUtil.newPut("row_" + id, id, 2);
+      logGroup.append(tableName, id, put);
+    }
+    logGroup.sync();
+
+    // Wait for the forwarder to eventually succeed after retrying peer 
creation
+    long deadline = EnvironmentEdgeManager.currentTimeMillis() + 120_000;
+    while (logGroup.getMode() != SYNC && 
EnvironmentEdgeManager.currentTimeMillis() < deadline) {
+      Thread.sleep(500);
+    }
+    assertEquals(SYNC, logGroup.getMode());
+  }
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 727959541e..2bf723defd 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -1709,7 +1709,8 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
       doAnswer(poisonNewWriter).when(log).createNewWriter();
       return log;
     };
-    doAnswer(poisonLog).when(logGroup).createFallbackLog();
+    doAnswer(poisonLog).when(logGroup)
+      .createReplicationLog(any(ReplicationShardDirectoryManager.class));
 
     // Poison the already-initialized SYNC log
     ReplicationLog activeLog = logGroup.getActiveLog();
@@ -1793,4 +1794,83 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     recreateLogGroup();
     assertEquals("Explicit override should take precedence", 5000L, 
logGroup.syncTimeoutMs);
   }
+
+  /**
+   * Tests that when the peer cluster is unavailable at startup, the group 
degrades from SYNC to
+   * STORE_AND_FORWARD and remains functional (append/sync work).
+   */
+  @Test
+  public void testInitDegradesToSafWhenPeerUnavailable() throws Exception {
+    final String tableName = "TBLDEG";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName, 
haGroupName,
+      haGroupStoreManager, useAlignedRotation()));
+    doThrow(new IOException("Standby namenode unavailable")).when(group)
+      .getOrCreatePeerShardManager();
+    group.init();
+
+    try {
+      // Should have degraded to STORE_AND_FORWARD
+      assertEquals(STORE_AND_FORWARD, group.getMode());
+
+      // Verify the group is functional — append and sync should work via 
local shard manager
+      group.append(tableName, commitId, put);
+      group.sync();
+    } finally {
+      group.close();
+    }
+  }
+
+  /**
+   * Tests that when the local cluster is unavailable at startup, init fails 
with IOException.
+   * Neither SYNC nor SAF mode can operate without a local shard manager.
+   */
+  @Test
+  public void testInitFailsWhenLocalUnavailable() throws Exception {
+    ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName, 
haGroupName,
+      haGroupStoreManager, useAlignedRotation()));
+    doThrow(new IOException("Local namenode 
unavailable")).when(group).createLocalShardManager();
+    try {
+      group.init();
+      fail("Should have thrown IOException when local shard manager is 
unavailable");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Local namenode unavailable"));
+    }
+  }
+
+  /**
+   * Tests that when peer shard manager creation exceeds the configured 
timeout, the group degrades
+   * to STORE_AND_FORWARD instead of blocking the disruptor handler thread 
indefinitely.
+   */
+  @Test
+  public void testInitDegradesToSafWhenPeerInitTimesOut() throws Exception {
+    final String tableName = "TBLTIMEOUT";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Set a very short peer init timeout
+    conf.setLong(ReplicationLogGroup.REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY, 
100L);
+
+    ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName, 
haGroupName,
+      haGroupStoreManager, useAlignedRotation()));
+    // Make createPeerShardManager block longer than the configured timeout
+    doAnswer(invocation -> {
+      Thread.sleep(5000);
+      return invocation.callRealMethod();
+    }).when(group).createPeerShardManager();
+    group.init();
+
+    try {
+      // Should have degraded to STORE_AND_FORWARD due to timeout
+      assertEquals(STORE_AND_FORWARD, group.getMode());
+
+      // Verify the group is functional
+      group.append(tableName, commitId, put);
+      group.sync();
+    } finally {
+      group.close();
+    }
+  }
 }

Reply via email to