This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new dfc14db550 PHOENIX-7845 ReplicationLogGroup initialization resilience
to standby cluster unavailability (#2466)
dfc14db550 is described below
commit dfc14db55099b3f8ef19b842760d0b95ade101db
Author: tkhurana <[email protected]>
AuthorDate: Mon May 11 14:39:16 2026 -0700
PHOENIX-7845 ReplicationLogGroup initialization resilience to standby
cluster unavailability (#2466)
---
.../phoenix/hbase/index/IndexRegionObserver.java | 4 +-
.../ReplicationLogDiscoveryForwarder.java | 14 +-
.../phoenix/replication/ReplicationLogGroup.java | 150 ++++++++++++++-------
.../replication/StoreAndForwardModeImpl.java | 2 +-
.../replication/SyncAndForwardModeImpl.java | 3 +-
.../apache/phoenix/replication/SyncModeImpl.java | 3 +-
.../phoenix/replication/ReplicationLogGroupIT.java | 2 +-
.../replication/ReplicationLogBaseTest.java | 10 +-
.../ReplicationLogDiscoveryForwarderTest.java | 53 ++++++++
.../replication/ReplicationLogGroupTest.java | 82 ++++++++++-
10 files changed, 260 insertions(+), 63 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 2548ee5f98..1ad0579a45 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -683,7 +683,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
* @return HA group if present or empty if missing
*/
private Optional<ReplicationLogGroup>
getHAGroupFromBatch(RegionCoprocessorEnvironment env,
- MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
if (miniBatchOp.size() > 0) {
Mutation m = miniBatchOp.getOperation(0);
byte[] haGroupName =
m.getAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
@@ -702,7 +702,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
* @return HA group if present or empty if missing
*/
private Optional<ReplicationLogGroup>
getHAGroupFromWALKey(RegionCoprocessorEnvironment env,
- org.apache.hadoop.hbase.wal.WALKey logKey) {
+ org.apache.hadoop.hbase.wal.WALKey logKey) throws IOException {
byte[] haGroupName =
logKey.getExtendedAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
if (haGroupName != null) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java
index d80994369c..1757421599 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarder.java
@@ -57,6 +57,12 @@ public class ReplicationLogDiscoveryForwarder extends
ReplicationLogDiscovery {
public static final String
REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY =
"phoenix.replication.forwarder.waiting.buffer.percentage";
+ /**
+ * Configuration key for in-progress directory processing probability
(percentage)
+ */
+ public static final String
REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY =
+ "phoenix.replication.forwarder.in.progress.processing.probability";
+
private final ReplicationLogGroup logGroup;
private final double copyThroughputThresholdBytesPerMs;
// the timestamp (in future) at which we will attempt to set the HAGroup
state to SYNC
@@ -134,7 +140,7 @@ public class ReplicationLogDiscoveryForwarder extends
ReplicationLogDiscovery {
FileSystem srcFS = replicationLogTracker.getFileSystem();
FileStatus srcStat = srcFS.getFileStatus(src);
long ts = replicationLogTracker.getFileTimestamp(srcStat.getPath());
- ReplicationShardDirectoryManager remoteShardManager =
logGroup.getPeerShardManager();
+ ReplicationShardDirectoryManager remoteShardManager =
logGroup.getOrCreatePeerShardManager();
Path dst = remoteShardManager.getWriterPath(ts,
logGroup.getServerName().getServerName());
long startTime = EnvironmentEdgeManager.currentTimeMillis();
FileUtil.copy(srcFS, srcStat, remoteShardManager.getFileSystem(), dst,
false, false, conf);
@@ -227,4 +233,10 @@ public class ReplicationLogDiscoveryForwarder extends
ReplicationLogDiscovery {
return
getConf().getDouble(REPLICATION_FORWARDER_WAITING_BUFFER_PERCENTAGE_KEY,
DEFAULT_WAITING_BUFFER_PERCENTAGE);
}
+
+ @Override
+ public double getInProgressDirectoryProcessProbability() {
+ return
getConf().getDouble(REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY,
+ super.getInProgressDirectoryProcessProbability());
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index c88d9b2647..ded1fec7b1 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -36,6 +36,7 @@ import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -165,6 +166,9 @@ public class ReplicationLogGroup {
public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
"phoenix.replication.log.retry.delay.ms";
public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
+ public static final String REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY =
+ "phoenix.replication.log.peer.init.timeout.ms";
+ public static final long DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS =
10_000L;
public static final String WAL_SYNC_TIMEOUT_MS_KEY =
"hbase.regionserver.wal.sync.timeout";
public static final long DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000L;
@@ -180,10 +184,15 @@ public class ReplicationLogGroup {
protected final String haGroupName;
protected final HAGroupStoreManager haGroupStoreManager;
protected final MetricsReplicationLogGroupSource metrics;
- protected ReplicationShardDirectoryManager peerShardManager;
+ // Cached at init time — HDFS URLs (local and peer) are fixed for the
lifetime of this group.
+ // URL changes require RS restart.
+ protected HAGroupStoreRecord haGroupStoreRecord;
protected ReplicationShardDirectoryManager localShardManager;
+ protected volatile ReplicationShardDirectoryManager peerShardManager;
+ private CompletableFuture<ReplicationShardDirectoryManager>
peerShardManagerFuture;
protected ReplicationLogDiscoveryForwarder logForwarder;
protected long syncTimeoutMs;
+ protected long peerInitTimeoutMs;
protected volatile boolean closed = false;
/**
@@ -331,20 +340,24 @@ public class ReplicationLogGroup {
* @param serverName The server name
* @param haGroupName The HA Group name
* @return ReplicationLogGroup instance
- * @throws RuntimeException if initialization fails
+ * @throws IOException if initialization fails
*/
public static ReplicationLogGroup get(Configuration conf, ServerName
serverName,
- String haGroupName) {
- return INSTANCES.computeIfAbsent(haGroupName, k -> {
- try {
- ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName,
haGroupName);
- group.init();
- return group;
- } catch (IOException e) {
- LOG.error("Failed to create ReplicationLogGroup for HA Group: {}",
haGroupName, e);
- throw new RuntimeException(e);
- }
- });
+ String haGroupName) throws IOException {
+ try {
+ return INSTANCES.computeIfAbsent(haGroupName, k -> {
+ try {
+ ReplicationLogGroup group = new ReplicationLogGroup(conf,
serverName, haGroupName);
+ group.init();
+ return group;
+ } catch (IOException e) {
+ LOG.error("Failed to create ReplicationLogGroup for HA Group: {}",
haGroupName, e);
+ throw new UncheckedIOException(e);
+ }
+ });
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
+ }
}
/**
@@ -354,21 +367,25 @@ public class ReplicationLogGroup {
* @param haGroupName The HA Group name
* @param haGroupStoreManager HA Group Store Manager instance
* @return ReplicationLogGroup instance
- * @throws RuntimeException if initialization fails
+ * @throws IOException if initialization fails
*/
public static ReplicationLogGroup get(Configuration conf, ServerName
serverName,
- String haGroupName, HAGroupStoreManager haGroupStoreManager) {
- return INSTANCES.computeIfAbsent(haGroupName, k -> {
- try {
- ReplicationLogGroup group =
- new ReplicationLogGroup(conf, serverName, haGroupName,
haGroupStoreManager);
- group.init();
- return group;
- } catch (IOException e) {
- LOG.error("Failed to create ReplicationLogGroup for HA Group: {}",
haGroupName, e);
- throw new RuntimeException(e);
- }
- });
+ String haGroupName, HAGroupStoreManager haGroupStoreManager) throws
IOException {
+ try {
+ return INSTANCES.computeIfAbsent(haGroupName, k -> {
+ try {
+ ReplicationLogGroup group =
+ new ReplicationLogGroup(conf, serverName, haGroupName,
haGroupStoreManager);
+ group.init();
+ return group;
+ } catch (IOException e) {
+ LOG.error("Failed to create ReplicationLogGroup for HA Group: {}",
haGroupName, e);
+ throw new UncheckedIOException(e);
+ }
+ });
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
+ }
}
/**
@@ -421,9 +438,10 @@ public class ReplicationLogGroup {
throw new IOException(message);
}
HAGroupStoreRecord record = haRecord.get();
- // First initialize the shard managers
- this.peerShardManager = createPeerShardManager(record);
- this.localShardManager = createLocalShardManager(record);
+ this.haGroupStoreRecord = record;
+ this.localShardManager = createLocalShardManager();
+ this.peerInitTimeoutMs =
conf.getLong(REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY,
+ DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS);
// Initialize the replication log forwarder. The log forwarder is only
activated when
// we switch to STORE_AND_FORWARD or SYNC_AND_FORWARD mode
this.logForwarder = new ReplicationLogDiscoveryForwarder(this);
@@ -783,21 +801,61 @@ public class ReplicationLogGroup {
}
}
- /** create shard manager for the standby cluster */
- protected ReplicationShardDirectoryManager
createPeerShardManager(HAGroupStoreRecord record)
- throws IOException {
- return createShardManager(record.getPeerHdfsUrl(), STANDBY_DIR);
+ /** create shard manager for the fallback cluster */
+ protected ReplicationShardDirectoryManager createLocalShardManager() throws
IOException {
+ return createShardManager(haGroupStoreRecord.getHdfsUrl(), FALLBACK_DIR);
}
- /** create shard manager for the fallback cluster */
- protected ReplicationShardDirectoryManager
createLocalShardManager(HAGroupStoreRecord record)
- throws IOException {
- return createShardManager(record.getHdfsUrl(), FALLBACK_DIR);
+ /**
+ * Get or create the peer shard manager. Thread-safe and idempotent — the
first successful
+ * creation is cached; subsequent calls return the cached instance. Bounded
by
+ * {@link #REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY} to prevent blocking the
disruptor handler
+ * thread on a peer NN outage.
+ */
+ protected ReplicationShardDirectoryManager getOrCreatePeerShardManager()
throws IOException {
+ ReplicationShardDirectoryManager cached = peerShardManager;
+ if (cached != null) {
+ return cached;
+ }
+ synchronized (this) {
+ if (peerShardManager != null) {
+ return peerShardManager;
+ }
+ if (peerShardManagerFuture == null ||
peerShardManagerFuture.isCompletedExceptionally()) {
+ // retry
+ peerShardManagerFuture = CompletableFuture.supplyAsync(() -> {
+ try {
+ return createPeerShardManager();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ }
+ try {
+ peerShardManager = peerShardManagerFuture.get(peerInitTimeoutMs,
TimeUnit.MILLISECONDS);
+ return peerShardManager;
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof UncheckedIOException) {
+ throw ((UncheckedIOException) cause).getCause();
+ }
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ }
+ throw new IOException("Failed to create peer shard manager", cause);
+ } catch (TimeoutException e) {
+ throw new IOException("Timed out creating peer shard manager after " +
peerInitTimeoutMs
+ + "ms for " + haGroupName, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while creating peer shard manager",
e);
+ }
+ }
}
- /** return shard manager for the standby cluster */
- protected ReplicationShardDirectoryManager getPeerShardManager() {
- return peerShardManager;
+ /** Create a new peer shard manager for the standby cluster */
+ protected ReplicationShardDirectoryManager createPeerShardManager() throws
IOException {
+ return createShardManager(haGroupStoreRecord.getPeerHdfsUrl(),
STANDBY_DIR);
}
/** return shard manager for the fallback cluster */
@@ -809,14 +867,10 @@ public class ReplicationLogGroup {
return FileSystem.get(uri, conf);
}
- /** Create the standby(synchronous) writer */
- protected ReplicationLog createStandbyLog() throws IOException {
- return new ReplicationLog(this, peerShardManager);
- }
-
- /** Create the fallback writer */
- protected ReplicationLog createFallbackLog() throws IOException {
- return new ReplicationLog(this, localShardManager);
+ /** Create a replication log using the given shard manager */
+ protected ReplicationLog
createReplicationLog(ReplicationShardDirectoryManager shardManager)
+ throws IOException {
+ return new ReplicationLog(this, shardManager);
}
/** Returns the log forwarder for this replication group */
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
index ea18a5b853..7bcb4c89e0 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
@@ -55,7 +55,7 @@ public class StoreAndForwardModeImpl extends
ReplicationModeImpl {
void onEnter() throws IOException {
LOG.info("HAGroup {} entered mode {}", logGroup, this);
// create a log on the fallback cluster
- log = logGroup.createFallbackLog();
+ log = logGroup.createReplicationLog(logGroup.getLocalShardManager());
log.init();
// Schedule task to periodically set the HAGroupStore state to
ACTIVE_NOT_IN_SYNC
startHAGroupStoreUpdateTask();
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
index aefce975cf..02c57f7f3f 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
@@ -44,7 +44,8 @@ public class SyncAndForwardModeImpl extends
ReplicationModeImpl {
void onEnter() throws IOException {
LOG.info("HAGroup {} entered mode {}", logGroup, this);
// create a log on the standby cluster
- log = logGroup.createStandbyLog();
+ ReplicationShardDirectoryManager peerShardManager =
logGroup.getOrCreatePeerShardManager();
+ log = logGroup.createReplicationLog(peerShardManager);
log.init();
// no-op if the forwarder is already started
logGroup.getLogForwarder().start();
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
index 75e175ad4e..0a5b5a48c6 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
@@ -43,7 +43,8 @@ public class SyncModeImpl extends ReplicationModeImpl {
void onEnter() throws IOException {
LOG.info("HAGroup {} entered mode {}", logGroup, this);
// create a log on the standby cluster
- log = logGroup.createStandbyLog();
+ ReplicationShardDirectoryManager peerShardManager =
logGroup.getOrCreatePeerShardManager();
+ log = logGroup.createReplicationLog(peerShardManager);
log.init();
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
index 949c3f2029..651cddde6c 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -124,7 +124,7 @@ public class ReplicationLogGroupIT extends HABaseIT {
LogFileAnalyzer analyzer = new LogFileAnalyzer();
// use peer cluster conf
analyzer.setConf(conf2);
- Path standByLogDir = logGroup.getPeerShardManager().getRootDirectoryPath();
+ Path standByLogDir =
logGroup.getOrCreatePeerShardManager().getRootDirectoryPath();
LOG.info("Analyzing log files at {}", standByLogDir);
String[] args = { "--check", standByLogDir.toString() };
assertEquals(0, analyzer.run(args));
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
index f748e3d1a8..d49e2ffc94 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogBaseTest.java
@@ -165,13 +165,9 @@ public class ReplicationLogBaseTest {
}
@Override
- protected ReplicationLog createStandbyLog() throws IOException {
- return spy(new TestableLog(this, peerShardManager, useAlignedRotation));
- }
-
- @Override
- protected ReplicationLog createFallbackLog() throws IOException {
- return spy(new TestableLog(this, localShardManager, useAlignedRotation));
+ protected ReplicationLog
createReplicationLog(ReplicationShardDirectoryManager shardManager)
+ throws IOException {
+ return spy(new TestableLog(this, shardManager, useAlignedRotation));
}
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
index 46efbf196e..65fb764a31 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogDiscoveryForwarderTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
import java.io.IOException;
import java.util.concurrent.Callable;
@@ -179,4 +180,56 @@ public class ReplicationLogDiscoveryForwarderTest extends
ReplicationLogBaseTest
}
assertEquals(SYNC, logGroup.getMode());
}
+
+ /**
+ * Tests that the forwarder retries peer shard manager creation when the
peer is initially
+ * unavailable. On the first attempt, getOrCreatePeerShardManager throws;
the file is marked
+ * failed and retried via in-progress processing. On the retry the peer
becomes available and
+ * forwarding succeeds.
+ */
+ @Test
+ public void testForwarderRetriesPeerCreation() throws Exception {
+ final String tableName = "TBLFWDRETRY";
+ final long count = 10L;
+
+ // Ensure in-progress files are immediately eligible for retry and always
processed
+
conf.setInt(ReplicationLogDiscovery.REPLICATION_IN_PROGRESS_FILE_MIN_AGE_SECONDS_KEY,
0);
+ conf.setDouble(
+
ReplicationLogDiscoveryForwarder.REPLICATION_FORWARDER_IN_PROGRESS_PROCESSING_PROBABILITY_KEY,
+ 100.0);
+ // Recreate the log group with the updated config
+ recreateLogGroup();
+ assertEquals(STORE_AND_FORWARD, logGroup.getMode());
+
+ // Make getOrCreatePeerShardManager fail on the first call, then succeed
on subsequent calls
+ doThrow(new IOException("Peer namenode
unavailable")).doCallRealMethod().when(logGroup)
+ .getOrCreatePeerShardManager();
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) {
+ logGroup.setMode(SYNC);
+ try {
+ logGroup.sync();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return 0L;
+ }
+ }).when(haGroupStoreManager).setHAGroupStatusToSync(haGroupName);
+
+ // Write some data so the forwarder has files to process
+ for (long id = 1; id <= count; ++id) {
+ Mutation put = LogFileTestUtil.newPut("row_" + id, id, 2);
+ logGroup.append(tableName, id, put);
+ }
+ logGroup.sync();
+
+ // Wait for the forwarder to eventually succeed after retrying peer
creation
+ long deadline = EnvironmentEdgeManager.currentTimeMillis() + 120_000;
+ while (logGroup.getMode() != SYNC &&
EnvironmentEdgeManager.currentTimeMillis() < deadline) {
+ Thread.sleep(500);
+ }
+ assertEquals(SYNC, logGroup.getMode());
+ }
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 727959541e..2bf723defd 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -1709,7 +1709,8 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
doAnswer(poisonNewWriter).when(log).createNewWriter();
return log;
};
- doAnswer(poisonLog).when(logGroup).createFallbackLog();
+ doAnswer(poisonLog).when(logGroup)
+ .createReplicationLog(any(ReplicationShardDirectoryManager.class));
// Poison the already-initialized SYNC log
ReplicationLog activeLog = logGroup.getActiveLog();
@@ -1793,4 +1794,83 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
recreateLogGroup();
assertEquals("Explicit override should take precedence", 5000L,
logGroup.syncTimeoutMs);
}
+
+ /**
+ * Tests that when the peer cluster is unavailable at startup, the group
degrades from SYNC to
+ * STORE_AND_FORWARD and remains functional (append/sync work).
+ */
+ @Test
+ public void testInitDegradesToSafWhenPeerUnavailable() throws Exception {
+ final String tableName = "TBLDEG";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName,
haGroupName,
+ haGroupStoreManager, useAlignedRotation()));
+ doThrow(new IOException("Standby namenode unavailable")).when(group)
+ .getOrCreatePeerShardManager();
+ group.init();
+
+ try {
+ // Should have degraded to STORE_AND_FORWARD
+ assertEquals(STORE_AND_FORWARD, group.getMode());
+
+ // Verify the group is functional — append and sync should work via
local shard manager
+ group.append(tableName, commitId, put);
+ group.sync();
+ } finally {
+ group.close();
+ }
+ }
+
+ /**
+ * Tests that when the local cluster is unavailable at startup, init fails
with IOException.
+ * Neither SYNC nor SAF mode can operate without a local shard manager.
+ */
+ @Test
+ public void testInitFailsWhenLocalUnavailable() throws Exception {
+ ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName,
haGroupName,
+ haGroupStoreManager, useAlignedRotation()));
+ doThrow(new IOException("Local namenode
unavailable")).when(group).createLocalShardManager();
+ try {
+ group.init();
+ fail("Should have thrown IOException when local shard manager is
unavailable");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Local namenode unavailable"));
+ }
+ }
+
+ /**
+ * Tests that when peer shard manager creation exceeds the configured
timeout, the group degrades
+ * to STORE_AND_FORWARD instead of blocking the disruptor handler thread
indefinitely.
+ */
+ @Test
+ public void testInitDegradesToSafWhenPeerInitTimesOut() throws Exception {
+ final String tableName = "TBLTIMEOUT";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Set a very short peer init timeout
+ conf.setLong(ReplicationLogGroup.REPLICATION_LOG_PEER_INIT_TIMEOUT_MS_KEY,
100L);
+
+ ReplicationLogGroup group = spy(new TestableLogGroup(conf, serverName,
haGroupName,
+ haGroupStoreManager, useAlignedRotation()));
+ // Make createPeerShardManager block longer than the configured timeout
+ doAnswer(invocation -> {
+ Thread.sleep(5000);
+ return invocation.callRealMethod();
+ }).when(group).createPeerShardManager();
+ group.init();
+
+ try {
+ // Should have degraded to STORE_AND_FORWARD due to timeout
+ assertEquals(STORE_AND_FORWARD, group.getMode());
+
+ // Verify the group is functional
+ group.append(tableName, commitId, put);
+ group.sync();
+ } finally {
+ group.close();
+ }
+ }
}