This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 389ff74a74 PHOENIX-7640 Refactor ReplicationLog for HA Groups (#2197)
389ff74a74 is described below

commit 389ff74a74979d3e45f43362f076b4727eae6776
Author: Andrew Purtell <[email protected]>
AuthorDate: Thu Dec 18 11:12:24 2025 -0800

    PHOENIX-7640 Refactor ReplicationLog for HA Groups (#2197)
    
    Conflicts:
            
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
            
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
            
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
            
phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
---
 .../phoenix/replication/ReplicationLogGroup.java   | 408 ++++++++++++++
 ...tionLog.java => ReplicationLogGroupWriter.java} | 538 ++++--------------
 .../phoenix/replication/StandbyLogGroupWriter.java | 136 +++++
 .../replication/StoreAndForwardLogGroupWriter.java |  74 +++
 ....java => MetricsReplicationLogGroupSource.java} |  14 +-
 ...a => MetricsReplicationLogGroupSourceImpl.java} |  27 +-
 ...onLogTest.java => ReplicationLogGroupTest.java} | 603 +++++++++------------
 7 files changed, 1017 insertions(+), 783 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
new file mode 100644
index 0000000000..80f15bbbf0
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogGroupSource;
+import 
org.apache.phoenix.replication.metrics.MetricsReplicationLogGroupSourceImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ReplicationLogGroup manages a group of replication logs for a given HA 
Group.
+ * <p>
+ * This class provides an API for replication operations and delegates to 
either synchronous
+ * replication (StandbyLogGroupWriter) or store-and-forward replication
+ * (StoreAndForwardLogGroupWriter) based on the current replication mode.
+ * <p>
+ * Key features:
+ * <ul>
+ * <li>Manages multiple replication logs for an HA Group</li>
+ * <li>Provides append() and sync() API for higher layers</li>
+ * <li>Delegates to appropriate writer implementation based on replication 
mode</li>
+ * <li>Thread-safe operations</li>
+ * </ul>
+ * <p>
+ * The class delegates actual replication work to implementations of 
ReplicationLogGroupWriter:
+ * <ul>
+ * <li>StandbyLogGroupWriter: Synchronous replication to standby cluster</li>
+ * <li>StoreAndForwardLogGroupWriter: Local storage with forwarding when 
available</li>
+ * </ul>
+ */
+public class ReplicationLogGroup {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogGroup.class);
+
+  // Configuration constants from original ReplicationLog
+  public static final String REPLICATION_STANDBY_HDFS_URL_KEY =
+    "phoenix.replication.log.standby.hdfs.url";
+  public static final String REPLICATION_FALLBACK_HDFS_URL_KEY =
+    "phoenix.replication.log.fallback.hdfs.url";
+  public static final String REPLICATION_NUM_SHARDS_KEY = 
"phoenix.replication.log.shards";
+  public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000;
+  public static final int MAX_REPLICATION_NUM_SHARDS = 100000;
+  public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
+    "phoenix.replication.log.rotation.time.ms";
+  public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 
1000L;
+  public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY =
+    "phoenix.replication.log.rotation.size.bytes";
+  public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 * 
1024 * 1024L;
+  public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY =
+    "phoenix.replication.log.rotation.size.percentage";
+  public static final double DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE 
= 0.95;
+  public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY =
+    "phoenix.replication.log.compression";
+  public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM = 
"NONE";
+  public static final String REPLICATION_LOG_RINGBUFFER_SIZE_KEY =
+    "phoenix.replication.log.ringbuffer.size";
+  public static final int DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE = 1024 * 32;
+  public static final String REPLICATION_LOG_SYNC_TIMEOUT_KEY =
+    "phoenix.replication.log.sync.timeout.ms";
+  public static final long DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT = 1000 * 30;
+  public static final String REPLICATION_LOG_SYNC_RETRIES_KEY =
+    "phoenix.replication.log.sync.retries";
+  public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 5;
+  public static final String REPLICATION_LOG_ROTATION_RETRIES_KEY =
+    "phoenix.replication.log.rotation.retries";
+  public static final int DEFAULT_REPLICATION_LOG_ROTATION_RETRIES = 5;
+  public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
+    "phoenix.replication.log.retry.delay.ms";
+  public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
+
+  public static final String SHARD_DIR_FORMAT = "%05d";
+  public static final String FILE_NAME_FORMAT = "%d-%s.plog";
+
+  /** Cache of ReplicationLogGroup instances by HA Group ID */
+  protected static final ConcurrentHashMap<String, ReplicationLogGroup> 
INSTANCES =
+    new ConcurrentHashMap<>();
+
+  protected final Configuration conf;
+  protected final ServerName serverName;
+  protected final String haGroupName;
+  protected ReplicationLogGroupWriter remoteWriter;
+  protected ReplicationLogGroupWriter localWriter;
+  protected ReplicationMode mode;
+  protected volatile boolean closed = false;
+  protected final MetricsReplicationLogGroupSource metrics;
+
+  /**
+   * Tracks the current replication mode of the ReplicationLog.
+   * <p>
+   * The replication mode determines how mutations are handled:
+   * <ul>
+   * <li>SYNC: Normal operation where mutations are written directly to the 
standby cluster's HDFS.
+   * This is the default and primary mode of operation.</li>
+   * <li>STORE_AND_FORWARD: Fallback mode when the standby cluster's HDFS is 
unavailable. Mutations
+   * are stored locally and will be forwarded when connectivity is 
restored.</li>
+   * <li>SYNC_AND_FORWARD: Transitional mode where new mutations are written 
directly to the standby
+   * cluster while concurrently draining the local queue of previously stored 
mutations.</li>
+   * </ul>
+   * <p>
+   * Mode transitions occur automatically based on the availability of the 
standby cluster's HDFS
+   * and the state of the local mutation queue.
+   */
+  protected enum ReplicationMode {
+    /**
+     * Normal operation where mutations are written directly to the standby 
cluster's HDFS. This is
+     * the default and primary mode of operation.
+     */
+    SYNC,
+
+    /**
+     * Fallback mode when the standby cluster's HDFS is unavailable. Mutations 
are stored locally
+     * and will be forwarded when connectivity is restored.
+     */
+    STORE_AND_FORWARD,
+
+    /**
+     * Transitional mode where new mutations are written directly to the 
standby cluster while
+     * concurrently draining the local queue of previously stored mutations. 
This mode is entered
+     * when connectivity to the standby cluster is restored and there are 
still mutations in the
+     * local queue.
+     */
+    SYNC_AND_FORWARD;
+  }
+
+  /**
+   * Get or create a ReplicationLogGroup instance for the given HA Group.
+   * @param conf        Configuration object
+   * @param serverName  The server name
+   * @param haGroupName The HA Group name
+   * @return ReplicationLogGroup instance
+   * @throws RuntimeException if initialization fails
+   */
+  public static ReplicationLogGroup get(Configuration conf, ServerName 
serverName,
+    String haGroupName) {
+    return INSTANCES.computeIfAbsent(haGroupName, k -> {
+      try {
+        ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName, 
haGroupName);
+        group.init();
+        return group;
+      } catch (IOException e) {
+        LOG.error("Failed to create ReplicationLogGroup for HA Group: {}", 
haGroupName, e);
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  /**
+   * Protected constructor for ReplicationLogGroup.
+   * @param conf        Configuration object
+   * @param serverName  The server name
+   * @param haGroupName The HA Group name
+   */
+  protected ReplicationLogGroup(Configuration conf, ServerName serverName, 
String haGroupName) {
+    this.conf = conf;
+    this.serverName = serverName;
+    this.haGroupName = haGroupName;
+    this.metrics = createMetricsSource();
+  }
+
+  /**
+   * Initialize the ReplicationLogGroup by creating the appropriate writer 
implementation.
+   * @throws IOException if initialization fails
+   */
+  protected void init() throws IOException {
+    // We need the local writer created first if we intend to fall back to it 
should the init
+    // of the remote writer fail.
+    localWriter = createLocalWriter();
+    // Initialize the remote writer and set the mode to SYNC. TODO: switch 
instead of set
+    mode = ReplicationMode.SYNC;
+    remoteWriter = createRemoteWriter();
+    // TODO: Switch the initial mode to STORE_AND_FORWARD if the remote writer 
fails to
+    // initialize.
+    LOG.info("Started ReplicationLogGroup for HA Group: {}", haGroupName);
+  }
+
+  /**
+   * Get the name for this HA Group.
+   * @return The name for this HA Group
+   */
+  public String getHaGroupName() {
+    return haGroupName;
+  }
+
+  protected Configuration getConfiguration() {
+    return conf;
+  }
+
+  protected ServerName getServerName() {
+    return serverName;
+  }
+
+  /**
+   * Append a mutation to the replication log group. This operation is 
normally non-blocking unless
+   * the ring buffer is full.
+   * @param tableName The name of the HBase table the mutation applies to
+   * @param commitId  The commit identifier (e.g., SCN) associated with the 
mutation
+   * @param mutation  The HBase Mutation (Put or Delete) to be logged
+   * @throws IOException If the operation fails
+   */
+  public void append(String tableName, long commitId, Mutation mutation) 
throws IOException {
+    if (closed) {
+      throw new IOException("Closed");
+    }
+    long startTime = System.nanoTime();
+    try {
+      switch (mode) {
+        case SYNC:
+          // In sync mode, we only write to the remote writer.
+          try {
+            remoteWriter.append(tableName, commitId, mutation);
+          } catch (IOException e) {
+            // TODO: If the remote writer fails, we must switch to store and 
forward.
+            LOG.warn("Mode switching not implemented");
+            throw e;
+          }
+          break;
+        case SYNC_AND_FORWARD:
+          // In sync and forward mode, we write to only the remote writer, 
while in the
+          // background we are draining the local queue.
+          try {
+            remoteWriter.append(tableName, commitId, mutation);
+          } catch (IOException e) {
+            // TODO: If the remote writer fails again, we must switch back to 
store and
+            // forward.
+            LOG.warn("Mode switching not implemented");
+            throw e;
+          }
+          break;
+        case STORE_AND_FORWARD:
+          // In store and forward mode, we append to the local writer. If we 
fail it's a
+          // critical failure.
+          localWriter.append(tableName, commitId, mutation);
+          // TODO: Probe the state of the remoteWriter. Can we switch back?
+          // TODO: This suggests the ReplicationLogGroupWriter interface 
should have a status
+          // probe API.
+          break;
+        default:
+          throw new IllegalStateException("Invalid replication mode: " + mode);
+      }
+    } finally {
+      metrics.updateAppendTime(System.nanoTime() - startTime);
+    }
+  }
+
+  /**
+   * Ensure all previously appended records are durably persisted. This method 
blocks until the sync
+   * operation completes or fails.
+   * @throws IOException If the sync operation fails
+   */
+  public void sync() throws IOException {
+    if (closed) {
+      throw new IOException("Closed");
+    }
+    long startTime = System.nanoTime();
+    try {
+      switch (mode) {
+        case SYNC:
+          // In sync mode, we only write to the remote writer.
+          try {
+            remoteWriter.sync();
+          } catch (IOException e) {
+            // TODO: If the remote writer fails, we must switch to store and 
forward.
+            LOG.warn("Mode switching not implemented");
+            throw e;
+          }
+          break;
+        case SYNC_AND_FORWARD:
+          // In sync and forward mode, we write to only the remote writer, 
while in the
+          // background we are draining the local queue.
+          try {
+            remoteWriter.sync();
+          } catch (IOException e) {
+            // TODO: If the remote writer fails again, we must switch back to 
store and
+            // forward.
+            LOG.warn("Mode switching not implemented");
+            throw e;
+          }
+          break;
+        case STORE_AND_FORWARD:
+          // In store and forward mode, we sync the local writer. If we fail 
it's a critical
+          // failure.
+          localWriter.sync();
+          // TODO: Probe the state of the remoteWriter. Can we switch back?
+          // TODO: This suggests the ReplicationLogGroupWriter interface 
should have a
+          // status probe API.
+          break;
+        default:
+          throw new IllegalStateException("Invalid replication mode: " + mode);
+      }
+    } finally {
+      metrics.updateSyncTime(System.nanoTime() - startTime);
+    }
+  }
+
+  /**
+   * Check if this ReplicationLogGroup is closed.
+   * @return true if closed, false otherwise
+   */
+  public boolean isClosed() {
+    return closed;
+  }
+
+  /**
+   * Close the ReplicationLogGroup and all associated resources. This method 
is thread-safe and can
+   * be called multiple times.
+   */
+  public void close() {
+    if (closed) {
+      return;
+    }
+    synchronized (this) {
+      if (closed) {
+        return;
+      }
+      closed = true;
+      // Remove from instances cache
+      INSTANCES.remove(haGroupName);
+      // Close the writers, remote first. If there are any problems closing 
the remote writer
+      // the pending writes will be sent to the local writer instead, during 
the appropriate
+      // mode switch.
+      closeWriter(remoteWriter);
+      closeWriter(localWriter);
+      metrics.close();
+      LOG.info("Closed ReplicationLogGroup for HA Group: {}", haGroupName);
+    }
+  }
+
+  /**
+   * Switch the replication mode.
+   * @param mode   The new replication mode
+   * @param reason The reason for the mode switch
+   * @throws IOException If the mode switch fails
+   */
+  public void switchMode(ReplicationMode mode, Throwable reason) throws 
IOException {
+    // TODO: Implement mode switching guardrails and transition logic.
+    // TODO: We will be interacting with the HA Group Store to switch modes.
+
+    // TODO: Drain the disruptor ring from the remote writer to the local 
writer when making
+    // transitions from SYNC or SYNC_AND_FORWARD to STORE_AND_FORWARD.
+
+    throw new UnsupportedOperationException("Mode switching is not 
implemented");
+  }
+
+  /** Get the current metrics source for monitoring operations. */
+  public MetricsReplicationLogGroupSource getMetrics() {
+    return metrics;
+  }
+
+  /** Create a new metrics source for monitoring operations. */
+  protected MetricsReplicationLogGroupSource createMetricsSource() {
+    return new MetricsReplicationLogGroupSourceImpl(haGroupName);
+  }
+
+  /** Close the given writer. */
+  protected void closeWriter(ReplicationLogGroupWriter writer) {
+    if (writer != null) {
+      writer.close();
+    }
+  }
+
+  /** Create the remote (synchronous) writer. Mainly for tests. */
+  protected ReplicationLogGroupWriter createRemoteWriter() throws IOException {
+    ReplicationLogGroupWriter writer = new StandbyLogGroupWriter(this);
+    writer.init();
+    return writer;
+  }
+
+  /** Create the local (store and forward) writer. Mainly for tests. */
+  protected ReplicationLogGroupWriter createLocalWriter() throws IOException {
+    ReplicationLogGroupWriter writer = new StoreAndForwardLogGroupWriter(this);
+    writer.init();
+    return writer;
+  }
+
+  /** Returns the currently active writer. Mainly for tests. */
+  protected ReplicationLogGroupWriter getActiveWriter() {
+    switch (mode) {
+      case SYNC:
+        return remoteWriter;
+      case SYNC_AND_FORWARD:
+        return remoteWriter;
+      case STORE_AND_FORWARD:
+        return localWriter;
+      default:
+        throw new IllegalStateException("Invalid replication mode: " + mode);
+    }
+  }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
similarity index 60%
rename from 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
rename to 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
index 3a2d9567a0..b76b8fab34 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
@@ -27,11 +27,9 @@ import com.lmax.disruptor.dsl.ProducerType;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -41,59 +39,27 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.phoenix.replication.log.LogFileWriter;
-import org.apache.phoenix.replication.log.LogFileWriterContext;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogSource;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogSourceImpl;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
- * The ReplicationLog implements a high-performance logging system for 
mutations that need to be
- * replicated to another cluster.
+ * Base class for replication log group writers.
  * <p>
- * Key features:
- * <ul>
- * <li>Asynchronous append operations with batching for high throughput</li>
- * <li>Controlled blocking sync operations with timeout and retry logic</li>
- * <li>Automatic log rotation based on time and size thresholds</li>
- * <li>Sharded directory structure for better HDFS performance</li>
- * <li>Metrics for monitoring</li>
- * <li>Fail-stop behavior on critical errors</li>
- * </ul>
- * <p>
- * The class supports three replication modes (though currently only SYNC is 
implemented):
- * <ul>
- * <li>SYNC: Direct writes to standby cluster (default)</li>
- * <li>STORE_AND_FORWARD: Local storage when standby is unavailable</li>
- * <li>SYNC_AND_FORWARD: Concurrent direct writes and queue draining</li>
- * </ul>
- * <p>
- * Key configuration properties:
- * <ul>
- * <li>{@link #REPLICATION_STANDBY_HDFS_URL_KEY}: URL for standby cluster 
HDFS</li>
- * <li>{@link #REPLICATION_FALLBACK_HDFS_URL_KEY}: URL for local fallback 
storage</li>
- * <li>{@link #REPLICATION_NUM_SHARDS_KEY}: Number of shard directories</li>
- * <li>{@link #REPLICATION_LOG_ROTATION_TIME_MS_KEY}: Time-based rotation 
interval</li>
- * <li>{@link #REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY}: Size-based rotation 
threshold</li>
- * <li>{@link #REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY}: Compression 
algorithm</li>
- * </ul>
- * <p>
- * This class is intended to be thread-safe.
+ * This abstract class contains most of the common functionality for managing 
replication logs
+ * including the disruptor ring buffer, log rotation, file system management, 
and metrics. Concrete
+ * implementations provide specific replication behavior (synchronous vs 
store-and- forward).
  * <p>
  * Architecture Overview:
  *
  * <pre>
  * ┌──────────────────────────────────────────────────────────────────────┐
- * │                           ReplicationLog                             │
+ * │                       ReplicationLogGroup                            │
  * │                                                                      │
  * │  ┌─────────────┐     ┌────────────────────────────────────────────┐  │
  * │  │             │     │                                            │  │
@@ -138,129 +104,26 @@ import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFac
  */
 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
     value = { "EI_EXPOSE_REP", "EI_EXPOSE_REP2", "MS_EXPOSE_REP" }, 
justification = "Intentional")
-public class ReplicationLog {
-
-  /** The path on the standby HDFS where log files should be written. (The 
"IN" directory.) */
-  public static final String REPLICATION_STANDBY_HDFS_URL_KEY =
-    "phoenix.replication.log.standby.hdfs.url";
-  /**
-   * The path on the active HDFS where log files should be written when we 
have fallen back to store
-   * and forward mode. (The "OUT" directory.)
-   */
-  public static final String REPLICATION_FALLBACK_HDFS_URL_KEY =
-    "phoenix.replication.log.fallback.hdfs.url";
-  /**
-   * The number of shards (subfolders) to maintain in the "IN" directory.
-   * <p>
-   * Shard directories have the format shard-NNNNN, e.g. shard-00001. The 
maximum value is 100000.
-   */
-  public static final String REPLICATION_NUM_SHARDS_KEY = 
"phoenix.replication.log.shards";
-  public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000;
-  public static final int MAX_REPLICATION_NUM_SHARDS = 100000;
-  /** Replication log rotation time trigger, default is 1 minute */
-  public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
-    "phoenix.replication.log.rotation.time.ms";
-  public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 
1000L;
-  /** Replication log rotation size trigger, default is 256 MB */
-  public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY =
-    "phoenix.replication.log.rotation.size.bytes";
-  public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 * 
1024 * 1024L;
-  /** Replication log rotation size trigger percentage, default is 0.95 */
-  public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY =
-    "phoenix.replication.log.rotation.size.percentage";
-  public static final double DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE 
= 0.95;
-  /** Replication log compression, default is "NONE" */
-  public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY =
-    "phoenix.replication.log.compression";
-  public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM = 
"NONE";
-  public static final String REPLICATION_LOG_RINGBUFFER_SIZE_KEY =
-    "phoenix.replication.log.ringbuffer.size";
-  public static final int DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE = 1024 * 32; 
// Too big?
-  public static final String REPLICATION_LOG_SYNC_TIMEOUT_KEY =
-    "phoenix.replication.log.sync.timeout.ms";
-  public static final long DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT = 1000 * 30;
-  public static final String REPLICATION_LOG_SYNC_RETRIES_KEY =
-    "phoenix.replication.log.sync.retries";
-  public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 5;
-  public static final String REPLICATION_LOG_ROTATION_RETRIES_KEY =
-    "phoenix.replication.log.rotation.retries";
-  public static final int DEFAULT_REPLICATION_LOG_ROTATION_RETRIES = 5;
-  public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
-    "phoenix.replication.log.retry.delay.ms";
-  public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
-
-  public static final String SHARD_DIR_FORMAT = "shard%05d";
-  public static final String FILE_NAME_FORMAT = "%d-%s.plog";
-
-  static final byte EVENT_TYPE_DATA = 0;
-  static final byte EVENT_TYPE_SYNC = 1;
-
-  static final Logger LOG = LoggerFactory.getLogger(ReplicationLog.class);
+public abstract class ReplicationLogGroupWriter {
 
-  protected static volatile ReplicationLog instance;
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogGroupWriter.class);
 
-  protected final Configuration conf;
-  protected final ServerName serverName;
-  protected FileSystem standbyFs;
-  protected FileSystem fallbackFs; // For store-and-forward (future use)
-  protected int numShards;
-  protected URI standbyUrl;
-  protected URI fallbackUrl; // For store-and-forward (future use)
+  protected final ReplicationLogGroup logGroup;
   protected final long rotationTimeMs;
   protected final long rotationSizeBytes;
   protected final int maxRotationRetries;
   protected final Compression.Algorithm compression;
+  protected final int ringBufferSize;
+  protected final long syncTimeoutMs;
   protected final ReentrantLock lock = new ReentrantLock();
-  protected volatile LogFileWriter currentWriter; // Current writer
+  protected volatile LogFileWriter currentWriter;
   protected final AtomicLong lastRotationTime = new AtomicLong();
   protected final AtomicLong writerGeneration = new AtomicLong();
   protected final AtomicLong rotationFailures = new AtomicLong(0);
   protected ScheduledExecutorService rotationExecutor;
-  protected final int ringBufferSize;
-  protected final long syncTimeoutMs;
   protected Disruptor<LogEvent> disruptor;
   protected RingBuffer<LogEvent> ringBuffer;
-  protected final ConcurrentHashMap<Path, Object> shardMap = new 
ConcurrentHashMap<>();
-  protected final MetricsReplicationLogSource metrics;
-  protected volatile boolean isClosed = false;
-
-  /**
-   * Tracks the current replication mode of the ReplicationLog.
-   * <p>
-   * The replication mode determines how mutations are handled:
-   * <ul>
-   * <li>SYNC: Normal operation where mutations are written directly to the 
standby cluster's HDFS.
-   * This is the default and primary mode of operation.</li>
-   * <li>STORE_AND_FORWARD: Fallback mode when the standby cluster's HDFS is 
unavailable. Mutations
-   * are stored locally and will be forwarded when connectivity is 
restored.</li>
-   * <li>SYNC_AND_FORWARD: Transitional mode where new mutations are written 
directly to the standby
-   * cluster while concurrently draining the local queue of previously stored 
mutations.</li>
-   * </ul>
-   * <p>
-   * Mode transitions occur automatically based on the availability of the 
standby cluster's HDFS
-   * and the state of the local mutation queue.
-   */
-  protected enum ReplicationMode {
-    /**
-     * Normal operation where mutations are written directly to the standby 
cluster's HDFS. This is
-     * the default and primary mode of operation.
-     */
-    SYNC,
-
-    /**
-     * Fallback mode when the standby cluster's HDFS is unavailable. Mutations 
are stored locally
-     * and will be forwarded when connectivity is restored.
-     */
-    STORE_AND_FORWARD,
-
-    /**
-     * Transitional mode where new mutations are written directly to the 
standby cluster while
-     * concurrently draining the local queue of previously stored mutations. 
This mode is entered
-     * when connectivity to the standby cluster is restored while there are 
still mutations in the
-     * local queue.
-     */
-    SYNC_AND_FORWARD;
-  }
+  protected volatile boolean closed = false;
 
   /** The reason for requesting a log rotation. */
   protected enum RotationReason {
@@ -272,89 +135,28 @@ public class ReplicationLog {
     ERROR;
   }
 
-  /**
-   * The current replication mode. Always SYNC for now.
-   * <p>
-   * TODO: Implement mode transitions to STORE_AND_FORWARD when standby 
becomes unavailable.
-   * <p>
-   * TODO: Implement mode transitions to SYNC_AND_FORWARD when draining queue.
-   */
-  protected volatile ReplicationMode currentMode = ReplicationMode.SYNC;
-
-  // TODO: Add configuration keys for store-and-forward behavior
-  // - Maximum retry attempts before switching to store-and-forward
-  // - Retry delay between attempts
-  // - Queue drain batch size
-  // - Queue drain interval
-
-  // TODO: Add state tracking fields
-  // - Queue of pending changes when in store-and-forward mode
-  // - Timestamp of last successful standby write
-  // - Error count for tracking consecutive failures
-
-  // TODO: Add methods for state transitions
-  // - switchToStoreAndForward() - Called when standby becomes unavailable
-  // - switchToSync() - Called when standby becomes available again
-  // - drainQueue() - Background task to process queued changes
-
-  // TODO: Enhance error handling in LogEventHandler
-  // - Track consecutive failures
-  // - Switch to store-and-forward after max retries
-
-  // TODO: Implement queue management for store-and-forward mode
-  // - Implement queue persistence to handle RegionServer restarts
-  // - Implement queue draining when in SYNC_AND_FORWARD state
-  // - Implement automatic recovery from temporary network issues
-  // - Add configurable thresholds for switching to store-and-forward based on 
write latency
-  // - Add circuit breaker pattern to prevent overwhelming the standby cluster
-  // - Add queue size limits and backpressure mechanisms
-  // - Add queue metrics for monitoring (queue size, oldest entry age, etc.)
-
-  // TODO: Enhance metrics for replication health monitoring
-  // - Add metrics for replication lag between active and standby
-  // - Track time spent in each replication mode (SYNC, STORE_AND_FORWARD, 
SYNC_AND_FORWARD)
-  // - Monitor queue drain rate and backlog size
-  // - Track consecutive failures and mode transition events
-
-  /**
-   * Gets the singleton instance of the ReplicationLogManager using the lazy 
initializer pattern.
-   * Initializes the instance if it hasn't been created yet.
-   * @param conf       Configuration object.
-   * @param serverName The server name.
-   * @return The singleton ReplicationLogManager instance.
-   * @throws IOException If initialization fails.
-   */
-  public static ReplicationLog get(Configuration conf, ServerName serverName) 
throws IOException {
-    if (instance == null) {
-      synchronized (ReplicationLog.class) {
-        if (instance == null) {
-          // Complete initialization before assignment
-          ReplicationLog logManager = new ReplicationLog(conf, serverName);
-          logManager.init();
-          instance = logManager;
-        }
-      }
-    }
-    return instance;
-  }
-
-  protected ReplicationLog(Configuration conf, ServerName serverName) {
-    this.conf = conf;
-    this.serverName = serverName;
-    this.rotationTimeMs =
-      conf.getLong(REPLICATION_LOG_ROTATION_TIME_MS_KEY, 
DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS);
-    long rotationSize = conf.getLong(REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
-      DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
-    double rotationSizePercent = 
conf.getDouble(REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY,
-      DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE);
+  protected static final byte EVENT_TYPE_DATA = 0;
+  protected static final byte EVENT_TYPE_SYNC = 1;
+
+  protected ReplicationLogGroupWriter(ReplicationLogGroup logGroup) {
+    this.logGroup = logGroup;
+    Configuration conf = logGroup.getConfiguration();
+    this.rotationTimeMs = 
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_TIME_MS_KEY,
+      ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS);
+    long rotationSize = 
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
+      ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
+    double rotationSizePercent =
+      
conf.getDouble(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY,
+        ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE);
     this.rotationSizeBytes = (long) (rotationSize * rotationSizePercent);
-    this.maxRotationRetries =
-      conf.getInt(REPLICATION_LOG_ROTATION_RETRIES_KEY, 
DEFAULT_REPLICATION_LOG_ROTATION_RETRIES);
-    this.numShards = conf.getInt(REPLICATION_NUM_SHARDS_KEY, 
DEFAULT_REPLICATION_NUM_SHARDS);
-    String compressionName = 
conf.get(REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY,
-      DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM);
+    this.maxRotationRetries = 
conf.getInt(ReplicationLogGroup.REPLICATION_LOG_ROTATION_RETRIES_KEY,
+      ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES);
+    String compressionName = 
conf.get(ReplicationLogGroup.REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY,
+      ReplicationLogGroup.DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM);
     Compression.Algorithm compression = Compression.Algorithm.NONE;
-    if 
(!DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM.equalsIgnoreCase(compressionName))
 {
+    if (
+      
!compressionName.equals(ReplicationLogGroup.DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM)
+    ) {
       try {
         compression = 
Compression.getCompressionAlgorithmByName(compressionName);
       } catch (IllegalArgumentException e) {
@@ -362,51 +164,21 @@ public class ReplicationLog {
       }
     }
     this.compression = compression;
-    this.ringBufferSize =
-      conf.getInt(REPLICATION_LOG_RINGBUFFER_SIZE_KEY, 
DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE);
-    this.syncTimeoutMs =
-      conf.getLong(REPLICATION_LOG_SYNC_TIMEOUT_KEY, 
DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT);
-    this.metrics = createMetricsSource();
-  }
-
-  /** Creates a new metrics source for monitoring replication log operations. 
*/
-  protected MetricsReplicationLogSource createMetricsSource() {
-    return new MetricsReplicationLogSourceImpl();
-  }
-
-  /** Returns the metrics source for monitoring replication log operations. */
-  public MetricsReplicationLogSource getMetrics() {
-    return metrics;
+    this.ringBufferSize = 
conf.getInt(ReplicationLogGroup.REPLICATION_LOG_RINGBUFFER_SIZE_KEY,
+      ReplicationLogGroup.DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE);
+    this.syncTimeoutMs = 
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_SYNC_TIMEOUT_KEY,
+      ReplicationLogGroup.DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT);
   }
 
-  @SuppressWarnings("unchecked")
+  /** Initialize the writer. */
   public void init() throws IOException {
-    if (numShards > MAX_REPLICATION_NUM_SHARDS) {
-      throw new IllegalArgumentException(REPLICATION_NUM_SHARDS_KEY + " is " + 
numShards
-        + ", but the limit is " + MAX_REPLICATION_NUM_SHARDS);
-    }
     initializeFileSystems();
     // Start time based rotation.
     lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
     startRotationExecutor();
-    // Create the initial writer. Do this before we call 
LogEventHandler.init().
-    currentWriter = createNewWriter(standbyFs, standbyUrl);
-    // Initialize the Disruptor. We use ProducerType.MULTI because multiple 
handlers might
-    // call append concurrently. We use YieldingWaitStrategy for low latency. 
When the ring
-    // buffer is full (controlled by REPLICATION_WRITER_RINGBUFFER_SIZE_KEY), 
producers
-    // calling ringBuffer.next() will effectively block (by 
yielding/spinning), creating
-    // backpressure on the callers. This ensures appends don't proceed until 
there is space.
-    disruptor = new Disruptor<>(
-      LogEvent.EVENT_FACTORY, ringBufferSize, new ThreadFactoryBuilder()
-        
.setNameFormat("ReplicationLogEventHandler-%d").setDaemon(true).build(),
-      ProducerType.MULTI, new YieldingWaitStrategy());
-    LogEventHandler eventHandler = new LogEventHandler();
-    eventHandler.init();
-    disruptor.handleEventsWith(eventHandler);
-    LogExceptionHandler exceptionHandler = new LogExceptionHandler();
-    disruptor.setDefaultExceptionHandler(exceptionHandler);
-    ringBuffer = disruptor.start();
-    LOG.info("ReplicationLogWriter started with ring buffer size {}", 
ringBufferSize);
+    // Create the initial writer. Do this before we initialize the Disruptor.
+    currentWriter = createNewWriter();
+    initializeDisruptor();
   }
 
   /**
@@ -428,10 +200,9 @@ public class ReplicationLog {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Append: table={}, commitId={}, mutation={}", tableName, 
commitId, mutation);
     }
-    if (isClosed) {
+    if (closed) {
       throw new IOException("Closed");
     }
-    long startTime = System.nanoTime();
     // ringBuffer.next() claims the next sequence number. Because we 
initialize the Disruptor
     // with ProducerType.MULTI and the blocking YieldingWaitStrategy this call 
WILL BLOCK if
     // the ring buffer is full, thus providing backpressure to the callers.
@@ -439,7 +210,6 @@ public class ReplicationLog {
     try {
       LogEvent event = ringBuffer.get(sequence);
       event.setValues(EVENT_TYPE_DATA, new Record(tableName, commitId, 
mutation), null);
-      metrics.updateAppendTime(System.nanoTime() - startTime);
     } finally {
       // Update ring buffer events metric
       ringBuffer.publish(sequence);
@@ -462,18 +232,44 @@ public class ReplicationLog {
    * @throws IOException If the sync operation fails after retries, or if 
interrupted.
    */
   public void sync() throws IOException {
-    if (isClosed) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Sync");
+    }
+    if (closed) {
       throw new IOException("Closed");
     }
     syncInternal();
   }
 
+  /** Initialize file systems needed by this writer implementation. */
+  protected abstract void initializeFileSystems() throws IOException;
+
+  /**
+   * Create a new log writer for rotation.
+   */
+  protected abstract LogFileWriter createNewWriter() throws IOException;
+
+  /** Initialize the Disruptor. */
+  @SuppressWarnings("unchecked")
+  protected void initializeDisruptor() throws IOException {
+    disruptor = new Disruptor<>(LogEvent.EVENT_FACTORY, ringBufferSize,
+      new ThreadFactoryBuilder()
+        .setNameFormat("ReplicationLogGroupWriter-" + 
logGroup.getHaGroupName() + "-%d")
+        .setDaemon(true).build(),
+      ProducerType.MULTI, new YieldingWaitStrategy());
+    LogEventHandler eventHandler = new LogEventHandler();
+    eventHandler.init();
+    disruptor.handleEventsWith(eventHandler);
+    LogExceptionHandler exceptionHandler = new LogExceptionHandler();
+    disruptor.setDefaultExceptionHandler(exceptionHandler);
+    ringBuffer = disruptor.start();
+  }
+
   /**
    * Internal implementation of sync that publishes a sync event to the ring 
buffer and waits for
    * completion.
    */
   protected void syncInternal() throws IOException {
-    long startTime = System.nanoTime();
     CompletableFuture<Void> syncFuture = new CompletableFuture<>();
     long sequence = ringBuffer.next();
     try {
@@ -486,10 +282,7 @@ public class ReplicationLog {
     try {
       // Wait for the event handler to process up to and including this sync 
event
       syncFuture.get(syncTimeoutMs, TimeUnit.MILLISECONDS);
-      metrics.updateSyncTime(System.nanoTime() - startTime);
     } catch (InterruptedException e) {
-      // Almost certainly the regionserver is shutting down or aborting.
-      // TODO: Do we need to do more here?
       Thread.currentThread().interrupt();
       throw new InterruptedIOException("Interrupted while waiting for sync");
     } catch (ExecutionException e) {
@@ -506,90 +299,32 @@ public class ReplicationLog {
     }
   }
 
-  /** Initializes the standby and fallback filesystems and creates their log 
directories. */
-  protected void initializeFileSystems() throws IOException {
-    String standbyUrlString = conf.get(REPLICATION_STANDBY_HDFS_URL_KEY);
-    if (standbyUrlString == null) {
-      throw new IOException(REPLICATION_STANDBY_HDFS_URL_KEY + " is not 
configured");
-    }
-    // Only validate that the URI is well formed. We should not assume the 
scheme must be
-    // "hdfs" because perhaps the operator will substitute another FileSystem 
implementation
-    // for DistributedFileSystem.
-    try {
-      this.standbyUrl = new URI(standbyUrlString);
-    } catch (URISyntaxException e) {
-      throw new IOException(REPLICATION_STANDBY_HDFS_URL_KEY + " is not 
valid", e);
-    }
-    String fallbackUrlString = conf.get(REPLICATION_FALLBACK_HDFS_URL_KEY);
-    if (fallbackUrlString != null) {
-      // Only validate that the URI is well formed, as above.
-      try {
-        this.fallbackUrl = new URI(fallbackUrlString);
-      } catch (URISyntaxException e) {
-        throw new IOException(REPLICATION_FALLBACK_HDFS_URL_KEY + " is not 
valid", e);
-      }
-      this.fallbackFs = getFileSystem(fallbackUrl);
-      Path fallbackLogDir = new Path(fallbackUrl.getPath());
-      if (!fallbackFs.exists(fallbackLogDir)) {
-        LOG.info("Creating directory {}", fallbackUrlString);
-        if (!this.fallbackFs.mkdirs(fallbackLogDir)) {
-          throw new IOException("Failed to create directory: " + 
fallbackUrlString);
-        }
-      }
-    } else {
-      // We support a synchronous replication only option if store-and-forward 
configuration
-      // keys are missing. This is outside the scope of the design spec but 
potentially
-      // useful for testing and also allows an operator to prefer failover 
consistency and
-      // simplicity over availability, even if that is not recommended. Log it 
at WARN level
-      // to focus appropriate attention. (Should it be ERROR?)
-      LOG.warn("Fallback not configured ({}), store-and-forward DISABLED.",
-        REPLICATION_FALLBACK_HDFS_URL_KEY);
-      this.fallbackFs = null;
-    }
-    // Configuration is sorted, and possibly store-and-forward directories 
have been created,
-    // now create the standby side directories as needed.
-    this.standbyFs = getFileSystem(standbyUrl);
-    Path standbyLogDir = new Path(standbyUrl.getPath());
-    if (!standbyFs.exists(standbyLogDir)) {
-      LOG.info("Creating directory {}", standbyUrlString);
-      if (!standbyFs.mkdirs(standbyLogDir)) {
-        throw new IOException("Failed to create directory: " + 
standbyUrlString);
-      }
-    }
-  }
-
-  /** Gets a FileSystem instance for the given URI using the current 
configuration. */
-  protected FileSystem getFileSystem(URI uri) throws IOException {
-    return FileSystem.get(uri, conf);
+  protected void startRotationExecutor() {
+    long rotationCheckInterval = getRotationCheckInterval(rotationTimeMs);
+    rotationExecutor = Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder()
+      .setNameFormat("ReplicationLogRotation-" + logGroup.getHaGroupName() + 
"-%d").setDaemon(true)
+      .build());
+    rotationExecutor.scheduleAtFixedRate(new LogRotationTask(), 
rotationCheckInterval,
+      rotationCheckInterval, TimeUnit.MILLISECONDS);
+    LOG.debug("Started rotation executor with interval {}ms", 
rotationCheckInterval);
   }
 
-  /** Calculates the interval for checking log rotation based on the 
configured rotation time. */
   protected long getRotationCheckInterval(long rotationTimeMs) {
-    long interval;
-    if (rotationTimeMs > 0) {
-      interval = rotationTimeMs / 4;
-    } else {
-      // If rotation time is not configured or invalid, use a sensible default 
like 10 seconds
-      interval = 10000L;
-    }
+    long interval = Math.max(10 * 1000L, Math.min(60 * 1000L, rotationTimeMs / 
10));
     return interval;
   }
 
-  /** Starts the background task for time-based log rotation. */
-  protected void startRotationExecutor() {
-    Preconditions.checkState(rotationExecutor == null, "Rotation executor 
already started");
-    rotationExecutor = Executors.newSingleThreadScheduledExecutor(
-      new 
ThreadFactoryBuilder().setNameFormat("ReplicationLogRotator-%d").setDaemon(true).build());
-    long interval = getRotationCheckInterval(rotationTimeMs);
-    rotationExecutor.scheduleWithFixedDelay(new LogRotationTask(), interval, 
interval,
-      TimeUnit.MILLISECONDS);
-  }
-
-  /** Stops the background task for time-based log rotation. */
   protected void stopRotationExecutor() {
     if (rotationExecutor != null) {
-      rotationExecutor.shutdownNow();
-      rotationExecutor = null;
+      rotationExecutor.shutdown();
+      try {
+        if (!rotationExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          rotationExecutor.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        rotationExecutor.shutdownNow();
+      }
     }
   }
 
@@ -664,7 +399,7 @@ public class ReplicationLog {
     try {
       // Try to get the new writer first. If it fails we continue using the 
current writer.
       // Increment the writer generation
-      LogFileWriter newWriter = createNewWriter(standbyFs, standbyUrl);
+      LogFileWriter newWriter = createNewWriter();
       LOG.debug("Created new writer: {}", newWriter);
       // Close the current writer
       if (currentWriter != null) {
@@ -674,23 +409,23 @@ public class ReplicationLog {
       currentWriter = newWriter;
       lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
       rotationFailures.set(0);
-      metrics.incrementRotationCount();
+      logGroup.getMetrics().incrementRotationCount();
       switch (reason) {
         case TIME:
-          metrics.incrementTimeBasedRotationCount();
+          logGroup.getMetrics().incrementTimeBasedRotationCount();
           break;
         case SIZE:
-          metrics.incrementSizeBasedRotationCount();
+          logGroup.getMetrics().incrementSizeBasedRotationCount();
           break;
         case ERROR:
-          metrics.incrementErrorBasedRotationCount();
+          logGroup.getMetrics().incrementErrorBasedRotationCount();
           break;
       }
     } catch (IOException e) {
       // If we fail to rotate the log, we increment the failure counter. If we 
have exceeded
       // the maximum number of retries, we close the log and throw the 
exception. Otherwise
       // we log a warning and continue.
-      metrics.incrementRotationFailureCount();
+      logGroup.getMetrics().incrementRotationFailureCount();
       long numFailures = rotationFailures.getAndIncrement();
       if (numFailures >= maxRotationRetries) {
         LOG.warn("Failed to rotate log (attempt {}/{}), closing log", 
numFailures,
@@ -706,57 +441,6 @@ public class ReplicationLog {
     return currentWriter;
   }
 
-  /**
-   * Creates a new log file path in a sharded directory structure based on 
server name and
-   * timestamp.
-   */
-  protected Path makeWriterPath(FileSystem fs, URI url) throws IOException {
-    long timestamp = EnvironmentEdgeManager.currentTimeMillis();
-    // To have all logs for a given regionserver appear in the same shard, 
hash only the
-    // serverName. However we expect some regionservers will have 
significantly more load than
-    // others so we instead distribute the logs over all of the shards 
randomly for a more even
-    // overall distribution by also hashing the timestamp.
-    int shard = (serverName.hashCode() ^ Long.hashCode(timestamp)) % numShards;
-    Path shardPath = new Path(url.getPath(), String.format(SHARD_DIR_FORMAT, 
shard));
-    // Ensure the shard directory exists. We track which shard directories we 
have probed or
-    // created to avoid a round trip to the namenode for repeats.
-    IOException[] exception = new IOException[1];
-    shardMap.computeIfAbsent(shardPath, p -> {
-      try {
-        if (!fs.exists(p)) {
-          if (!fs.mkdirs(p)) {
-            throw new IOException("Could not create path: " + p);
-          }
-        }
-      } catch (IOException e) {
-        exception[0] = e;
-      }
-      return p;
-    });
-    // If we faced an exception in computeIfAbsent, throw it
-    if (exception[0] != null) {
-      throw exception[0];
-    }
-    Path filePath = new Path(shardPath, String.format(FILE_NAME_FORMAT, 
timestamp, serverName));
-    return filePath;
-  }
-
-  /** Creates and initializes a new LogFileWriter for the given filesystem and 
URL. */
-  protected LogFileWriter createNewWriter(FileSystem fs, URI url) throws 
IOException {
-    Path filePath = makeWriterPath(fs, url);
-    LogFileWriterContext writerContext = new 
LogFileWriterContext(conf).setFileSystem(fs)
-      .setFilePath(filePath).setCompression(compression);
-    LogFileWriter newWriter = new LogFileWriter();
-    try {
-      newWriter.init(writerContext);
-      newWriter.setGeneration(writerGeneration.incrementAndGet());
-    } catch (IOException e) {
-      LOG.error("Failed to initialize new LogFileWriter for path {}", 
filePath, e);
-      throw e;
-    }
-    return newWriter;
-  }
-
   /** Closes the given writer, logging any errors that occur during close. */
   protected void closeWriter(LogFileWriter writer) {
     if (writer == null) {
@@ -770,6 +454,14 @@ public class ReplicationLog {
     }
   }
 
+  /**
+   * Check if this ReplicationLogGroup is closed.
+   * @return true if closed, false otherwise
+   */
+  public boolean isClosed() {
+    return closed;
+  }
+
   /**
    * Force closes the log upon an unrecoverable internal error. This is a 
fail-stop behavior: once
    * called, the log is marked as closed, the Disruptor is halted, and all 
subsequent append() and
@@ -779,10 +471,10 @@ public class ReplicationLog {
   protected void closeOnError() {
     lock.lock();
     try {
-      if (isClosed) {
+      if (closed) {
         return;
       }
-      isClosed = true;
+      closed = true;
     } finally {
       lock.unlock();
     }
@@ -799,10 +491,10 @@ public class ReplicationLog {
   public void close() {
     lock.lock();
     try {
-      if (isClosed) {
+      if (closed) {
         return;
       }
-      isClosed = true;
+      closed = true;
     } finally {
       lock.unlock();
     }
@@ -820,11 +512,15 @@ public class ReplicationLog {
     closeWriter(currentWriter);
   }
 
+  protected FileSystem getFileSystem(URI uri) throws IOException {
+    return FileSystem.get(uri, logGroup.getConfiguration());
+  }
+
   /** Implements time based rotation independent of in-line checking. */
   protected class LogRotationTask implements Runnable {
     @Override
     public void run() {
-      if (isClosed) {
+      if (closed) {
         return;
       }
       // Use tryLock with a timeout to avoid blocking indefinitely if another 
thread holds
@@ -837,7 +533,7 @@ public class ReplicationLog {
           // Check only the time condition here, size is handled by getWriter
           long now = EnvironmentEdgeManager.currentTimeMillis();
           long last = lastRotationTime.get();
-          if (!isClosed && now - last >= rotationTimeMs) {
+          if (!closed && now - last >= rotationTimeMs) {
             LOG.debug("Time based rotation needed ({} ms elapsed, threshold {} 
ms).", now - last,
               rotationTimeMs);
             try {
@@ -903,10 +599,11 @@ public class ReplicationLog {
     protected long generation;
 
     protected LogEventHandler() {
-      this.maxRetries =
-        conf.getInt(REPLICATION_LOG_SYNC_RETRIES_KEY, 
DEFAULT_REPLICATION_LOG_SYNC_RETRIES);
-      this.retryDelayMs =
-        conf.getLong(REPLICATION_LOG_RETRY_DELAY_MS_KEY, 
DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
+      Configuration conf = logGroup.getConfiguration();
+      this.maxRetries = 
conf.getInt(ReplicationLogGroup.REPLICATION_LOG_SYNC_RETRIES_KEY,
+        ReplicationLogGroup.DEFAULT_REPLICATION_LOG_SYNC_RETRIES);
+      this.retryDelayMs = 
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_RETRY_DELAY_MS_KEY,
+        ReplicationLogGroup.DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
     }
 
     protected void init() throws IOException {
@@ -997,7 +694,7 @@ public class ReplicationLog {
       // Calculate time spent in ring buffer
       long currentTimeNs = System.nanoTime();
       long ringBufferTimeNs = currentTimeNs - event.timestampNs;
-      metrics.updateRingBufferTime(ringBufferTimeNs);
+      logGroup.getMetrics().updateRingBufferTime(ringBufferTimeNs);
       writer = getWriter();
       int attempt = 0;
       while (attempt < maxRetries) {
@@ -1081,5 +778,4 @@ public class ReplicationLog {
       closeOnError();
     }
   }
-
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StandbyLogGroupWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StandbyLogGroupWriter.java
new file mode 100644
index 0000000000..5c75088608
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StandbyLogGroupWriter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Synchronous replication implementation of ReplicationLogGroupWriter.
+ * <p>
+ * This class implements synchronous replication to a standby cluster's HDFS. 
It writes replication
+ * logs directly to the standby cluster in synchronous mode, providing 
immediate consistency for
+ * failover scenarios.
+ */
+public class StandbyLogGroupWriter extends ReplicationLogGroupWriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StandbyLogGroupWriter.class);
+
+  private FileSystem standbyFs;
+  private URI standbyUrl;
+  protected int numShards;
+  protected final ConcurrentHashMap<Path, Object> shardMap = new 
ConcurrentHashMap<>();
+
+  /**
+   * Constructor for StandbyLogGroupWriter.
+   */
+  public StandbyLogGroupWriter(ReplicationLogGroup logGroup) {
+    super(logGroup);
+    Configuration conf = logGroup.getConfiguration();
+    this.numShards = 
conf.getInt(ReplicationLogGroup.REPLICATION_NUM_SHARDS_KEY,
+      ReplicationLogGroup.DEFAULT_REPLICATION_NUM_SHARDS);
+    LOG.debug("Created StandbyLogGroupWriter for HA Group: {}", 
logGroup.getHaGroupName());
+  }
+
+  @Override
+  protected void initializeFileSystems() throws IOException {
+    if (numShards > ReplicationLogGroup.MAX_REPLICATION_NUM_SHARDS) {
+      throw new 
IllegalArgumentException(ReplicationLogGroup.REPLICATION_NUM_SHARDS_KEY + " is "
+        + numShards + ", but the limit is " + 
ReplicationLogGroup.MAX_REPLICATION_NUM_SHARDS);
+    }
+    Configuration conf = logGroup.getConfiguration();
+    String standbyUrlString = 
conf.get(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY);
+    if (standbyUrlString == null || standbyUrlString.trim().isEmpty()) {
+      throw new IOException(
+        "Standby HDFS URL not configured: " + 
ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY);
+    }
+    try {
+      standbyUrl = new URI(standbyUrlString);
+      standbyFs = getFileSystem(standbyUrl);
+      LOG.info("Initialized standby filesystem: {}", standbyUrl);
+    } catch (URISyntaxException e) {
+      throw new IOException("Invalid standby HDFS URL: " + standbyUrlString, 
e);
+    }
+  }
+
+  /**
+   * Creates a new log file path in a sharded directory structure based on 
server name and
+   * timestamp. The resulting path structure is
+   *
+   * <pre>
+   * [url]/[haGroupId]/[shard]/[timestamp]-[servername].plog
+   * </pre>
+   */
+  protected Path makeWriterPath(FileSystem fs, URI url) throws IOException {
+    Path haGroupPath = new Path(url.getPath(), logGroup.getHaGroupName());
+    long timestamp = EnvironmentEdgeManager.currentTimeMillis();
+    // To have all logs for a given regionserver appear in the same shard, 
hash only the
+    // serverName. However we expect some regionservers will have 
significantly more load than
+    // others so we instead distribute the logs over all of the shards 
randomly for a more even
+    // overall distribution by also hashing the timestamp.
+    int shard =
+      Math.floorMod(logGroup.getServerName().hashCode() ^ 
Long.hashCode(timestamp), numShards);
+    Path shardPath =
+      new Path(haGroupPath, 
String.format(ReplicationLogGroup.SHARD_DIR_FORMAT, shard));
+    // Ensure the shard directory exists. We track which shard directories we 
have probed or
+    // created to avoid a round trip to the namenode for repeats.
+    IOException[] exception = new IOException[1];
+    shardMap.computeIfAbsent(shardPath, p -> {
+      try {
+        if (!fs.exists(p)) {
+          fs.mkdirs(haGroupPath); // This probably exists, but just in case.
+          if (!fs.mkdirs(shardPath)) {
+            throw new IOException("Could not create path: " + p);
+          }
+        }
+      } catch (IOException e) {
+        exception[0] = e;
+        return null; // Don't cache the path if we can't create it.
+      }
+      return p;
+    });
+    // If we faced an exception in computeIfAbsent, throw it
+    if (exception[0] != null) {
+      throw exception[0];
+    }
+    Path filePath = new Path(shardPath,
+      String.format(ReplicationLogGroup.FILE_NAME_FORMAT, timestamp, 
logGroup.getServerName()));
+    return filePath;
+  }
+
+  /** Creates and initializes a new LogFileWriter. */
+  protected LogFileWriter createNewWriter() throws IOException {
+    Path filePath = makeWriterPath(standbyFs, standbyUrl);
+    LogFileWriterContext writerContext = new 
LogFileWriterContext(logGroup.getConfiguration())
+      
.setFileSystem(standbyFs).setFilePath(filePath).setCompression(compression);
+    LogFileWriter newWriter = new LogFileWriter();
+    newWriter.init(writerContext);
+    newWriter.setGeneration(writerGeneration.incrementAndGet());
+    return newWriter;
+  }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardLogGroupWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardLogGroupWriter.java
new file mode 100644
index 0000000000..c491ff82d9
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardLogGroupWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Store-and-forward replication implementation of ReplicationLogGroupWriter.
+ * <p>
+ * This class is a stub implementation for future store-and-forward 
replication functionality.
+ * Store-and-forward mode is used when the standby cluster is temporarily 
unavailable - mutations
+ * are stored locally and forwarded when connectivity is restored.
+ * <p>
+ * Currently this is a stub that throws UnsupportedOperationException for the 
abstract methods.
+ * Future implementation will include:
+ * <ul>
+ * <li>Local storage of mutations when standby is unavailable</li>
+ * <li>Background forwarding when connectivity is restored</li>
+ * <li>Proper error handling and retry logic</li>
+ * <li>Integration with HA state management</li>
+ * <li>Dual-mode operation: local storage + forwarding</li>
+ * </ul>
+ */
+public class StoreAndForwardLogGroupWriter extends ReplicationLogGroupWriter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StoreAndForwardLogGroupWriter.class);
+
+  /**
+   * Constructor for StoreAndForwardLogGroupWriter.
+   */
+  public StoreAndForwardLogGroupWriter(ReplicationLogGroup logGroup) {
+    super(logGroup);
+    LOG.debug("Created StoreAndForwardLogGroupWriter for HA Group: {}", 
logGroup.getHaGroupName());
+  }
+
+  @Override
+  public void init() throws IOException {
+    // TODO
+  }
+
+  @Override
+  public void close() {
+    // TODO
+  }
+
+  @Override
+  protected void initializeFileSystems() throws IOException {
+    // TODO
+  }
+
+  @Override
+  protected LogFileWriter createNewWriter() throws IOException {
+    // TODO
+    return null;
+  }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
similarity index 91%
rename from 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
rename to 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
index 907e3398fe..798c048136 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
@@ -19,12 +19,12 @@ package org.apache.phoenix.replication.metrics;
 
 import org.apache.hadoop.hbase.metrics.BaseSource;
 
-/** Interface for metrics related to ReplicationLog operations. */
-public interface MetricsReplicationLogSource extends BaseSource {
+/** Interface for metrics related to ReplicationLogGroup operations. */
+public interface MetricsReplicationLogGroupSource extends BaseSource {
 
-  String METRICS_NAME = "ReplicationLog";
+  String METRICS_NAME = "ReplicationLogGroup";
   String METRICS_CONTEXT = "phoenix";
-  String METRICS_DESCRIPTION = "Metrics about Phoenix Replication Log 
Operations";
+  String METRICS_DESCRIPTION = "Metrics about Replication Log Operations for 
an HA Group";
   String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
 
   String TIME_BASED_ROTATION_COUNT = "timeBasedRotationCount";
@@ -108,7 +108,11 @@ public interface MetricsReplicationLogSource extends 
BaseSource {
    */
   void incrementRotationFailureCount();
 
+  /**
+   * Unregister this metrics source.
+   */
+  void close();
+
   // Get current values for testing
   ReplicationLogMetricValues getCurrentMetricValues();
-
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
similarity index 86%
rename from 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
rename to 
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
index fcb08efde9..7b3bb5789c 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
@@ -18,12 +18,13 @@
 package org.apache.phoenix.replication.metrics;
 
 import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableFastCounter;
 import org.apache.hadoop.metrics2.lib.MutableHistogram;
 
 /** Implementation of metrics source for ReplicationLog operations. */
-public class MetricsReplicationLogSourceImpl extends BaseSourceImpl
-  implements MetricsReplicationLogSource {
+public class MetricsReplicationLogGroupSourceImpl extends BaseSourceImpl
+  implements MetricsReplicationLogGroupSource {
 
   private final MutableFastCounter timeBasedRotationCount;
   private final MutableFastCounter sizeBasedRotationCount;
@@ -35,13 +36,14 @@ public class MetricsReplicationLogSourceImpl extends 
BaseSourceImpl
   private final MutableHistogram rotationTime;
   private final MutableHistogram ringBufferTime;
 
-  public MetricsReplicationLogSourceImpl() {
-    this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, 
METRICS_JMX_CONTEXT);
+  public MetricsReplicationLogGroupSourceImpl(String haGroupName) {
+    this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, 
METRICS_JMX_CONTEXT, haGroupName);
   }
 
-  public MetricsReplicationLogSourceImpl(String metricsName, String 
metricsDescription,
-    String metricsContext, String metricsJmxContext) {
-    super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+  public MetricsReplicationLogGroupSourceImpl(String metricsName, String 
metricsDescription,
+    String metricsContext, String metricsJmxContext, String haGroupName) {
+    super(metricsName, metricsDescription, metricsContext,
+      metricsJmxContext + ",haGroup=" + haGroupName);
     timeBasedRotationCount = 
getMetricsRegistry().newCounter(TIME_BASED_ROTATION_COUNT,
       TIME_BASED_ROTATION_COUNT_DESC, 0L);
     sizeBasedRotationCount = 
getMetricsRegistry().newCounter(SIZE_BASED_ROTATION_COUNT,
@@ -57,6 +59,11 @@ public class MetricsReplicationLogSourceImpl extends 
BaseSourceImpl
     ringBufferTime = getMetricsRegistry().newHistogram(RING_BUFFER_TIME, 
RING_BUFFER_TIME_DESC);
   }
 
+  @Override
+  public void close() {
+    DefaultMetricsSystem.instance().unregisterSource(metricsJmxContext);
+  }
+
   @Override
   public void incrementTimeBasedRotationCount() {
     timeBasedRotationCount.incr();
@@ -124,10 +131,4 @@ public class MetricsReplicationLogSourceImpl extends 
BaseSourceImpl
   public String getMetricsContext() {
     return METRICS_CONTEXT;
   }
-
-  @Override
-  public String getMetricsJmxContext() {
-    return METRICS_JMX_CONTEXT;
-  }
-
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
similarity index 73%
rename from 
phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
rename to 
phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index ced1527400..e6e5818430 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -43,25 +43,20 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.phoenix.replication.ReplicationLog.RotationReason;
+import org.apache.phoenix.replication.ReplicationLogGroupWriter.RotationReason;
 import org.apache.phoenix.replication.log.LogFile;
 import org.apache.phoenix.replication.log.LogFileReader;
 import org.apache.phoenix.replication.log.LogFileReaderContext;
 import org.apache.phoenix.replication.log.LogFileTestUtil;
 import org.apache.phoenix.replication.log.LogFileWriter;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogSource;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogSourceImpl;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.junit.After;
 import org.junit.Before;
@@ -75,9 +70,9 @@ import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ReplicationLogTest {
+public class ReplicationLogGroupTest {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogTest.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogGroupTest.class);
 
   @ClassRule
   public static TemporaryFolder testFolder = new TemporaryFolder();
@@ -86,7 +81,7 @@ public class ReplicationLogTest {
   private ServerName serverName;
   private FileSystem localFs;
   private URI standbyUri;
-  private ReplicationLog logWriter;
+  private ReplicationLogGroup logGroup;
 
   static final int TEST_RINGBUFFER_SIZE = 32;
   static final int TEST_SYNC_TIMEOUT = 1000;
@@ -99,29 +94,26 @@ public class ReplicationLogTest {
     localFs = FileSystem.getLocal(conf);
     standbyUri = new Path(testFolder.toString()).toUri();
     serverName = ServerName.valueOf("test", 60010, 
EnvironmentEdgeManager.currentTimeMillis());
-    conf.set(ReplicationLog.REPLICATION_STANDBY_HDFS_URL_KEY, 
standbyUri.toString());
+    conf.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY, 
standbyUri.toString());
     // Small ring buffer size for testing
-    conf.setInt(ReplicationLog.REPLICATION_LOG_RINGBUFFER_SIZE_KEY, 
TEST_RINGBUFFER_SIZE);
+    conf.setInt(ReplicationLogGroup.REPLICATION_LOG_RINGBUFFER_SIZE_KEY, 
TEST_RINGBUFFER_SIZE);
     // Set a short sync timeout for testing
-    conf.setLong(ReplicationLog.REPLICATION_LOG_SYNC_TIMEOUT_KEY, 
TEST_SYNC_TIMEOUT);
+    conf.setLong(ReplicationLogGroup.REPLICATION_LOG_SYNC_TIMEOUT_KEY, 
TEST_SYNC_TIMEOUT);
     // Set rotation time to 10 seconds
-    conf.setLong(ReplicationLog.REPLICATION_LOG_ROTATION_TIME_MS_KEY, 
TEST_ROTATION_TIME);
+    conf.setLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_TIME_MS_KEY, 
TEST_ROTATION_TIME);
     // Small size threshold for testing
-    conf.setLong(ReplicationLog.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY, 
TEST_ROTATION_SIZE_BYTES);
+    conf.setLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
+      TEST_ROTATION_SIZE_BYTES);
 
-    logWriter = spy(new TestableReplicationLog(conf, serverName));
-    logWriter.init();
+    logGroup = new TestableLogGroup(conf, serverName, "testHAGroup");
+    logGroup.init();
   }
 
   @After
   public void tearDown() throws Exception {
-    if (logWriter != null) {
-      logWriter.close();
+    if (logGroup != null) {
+      logGroup.close();
     }
-    // Deregister the metrics source that the replication log registers during 
initialization
-    // so the next unit will be able to register it again and successfully 
initialize.
-    DefaultMetricsSystem.instance()
-      .unregisterSource(MetricsReplicationLogSource.METRICS_JMX_CONTEXT);
   }
 
   /**
@@ -143,17 +135,17 @@ public class ReplicationLogTest {
     final Mutation put5 = LogFileTestUtil.newPut("row5", 5, 1);
 
     // Get the inner writer
-    LogFileWriter writer = logWriter.getWriter();
+    LogFileWriter writer = logGroup.getActiveWriter().getWriter();
     assertNotNull("Writer should not be null", writer);
     InOrder inOrder = Mockito.inOrder(writer);
 
-    logWriter.append(tableName, commitId1, put1);
-    logWriter.append(tableName, commitId2, put2);
-    logWriter.append(tableName, commitId3, put3);
-    logWriter.append(tableName, commitId4, put4);
-    logWriter.append(tableName, commitId5, put5);
+    logGroup.append(tableName, commitId1, put1);
+    logGroup.append(tableName, commitId2, put2);
+    logGroup.append(tableName, commitId3, put3);
+    logGroup.append(tableName, commitId4, put4);
+    logGroup.append(tableName, commitId5, put5);
 
-    logWriter.sync();
+    logGroup.sync();
 
     // Happens-before ordering verification, using Mockito's inOrder. Verify 
that the appends
     // happen before sync, and sync happened after appends.
@@ -165,40 +157,6 @@ public class ReplicationLogTest {
     inOrder.verify(writer, times(1)).sync();
   }
 
-  /**
-   * Tests the behavior when an append operation fails. Verifies that the 
system properly handles
-   * append failures by rolling to a new writer and retrying the operation.
-   */
-  @Test
-  public void testAppendFailureAndRetry() throws Exception {
-    final String tableName = "TBLAFR";
-    final long commitId = 1L;
-    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
-
-    // Get the inner writer
-    LogFileWriter writerBeforeRoll = logWriter.getWriter();
-    assertNotNull("Initial writer should not be null", writerBeforeRoll);
-
-    // Configure writerBeforeRoll to fail on the first append call
-    doThrow(new IOException("Simulated append 
failure")).when(writerBeforeRoll).append(anyString(),
-      anyLong(), any(Mutation.class));
-
-    // Append data
-    logWriter.append(tableName, commitId, put);
-    logWriter.sync();
-
-    // Get the inner writer we rolled to.
-    LogFileWriter writerAfterRoll = logWriter.getWriter();
-    assertNotNull("Rolled writer should not be null", writerAfterRoll);
-
-    // Verify the sequence: append (fail), rotate, append (succeed), sync
-    InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
-    inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put));
-    inOrder.verify(writerBeforeRoll, times(0)).sync(); // We failed append, 
did not try
-    inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put)); // Retry
-    inOrder.verify(writerAfterRoll, times(1)).sync();
-  }
-
   /**
    * Tests the behavior when a sync operation fails. Verifies that the system 
properly handles sync
    * failures by rolling to a new writer and retrying the operation.
@@ -210,18 +168,18 @@ public class ReplicationLogTest {
     final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
     // Get the inner writer
-    LogFileWriter writerBeforeRoll = logWriter.getWriter();
+    LogFileWriter writerBeforeRoll = logGroup.getActiveWriter().getWriter();
     assertNotNull("Initial writer should not be null", writerBeforeRoll);
 
     // Configure writerBeforeRoll to fail on the first sync call
     doThrow(new IOException("Simulated sync 
failure")).when(writerBeforeRoll).sync();
 
     // Append data
-    logWriter.append(tableName, commitId, put);
-    logWriter.sync();
+    logGroup.append(tableName, commitId, put);
+    logGroup.sync();
 
     // Get the inner writer we rolled to.
-    LogFileWriter writerAfterRoll = logWriter.getWriter();
+    LogFileWriter writerAfterRoll = logGroup.getActiveWriter().getWriter();
     assertNotNull("Initial writer should not be null", writerBeforeRoll);
 
     // Verify the sequence: append, sync (fail), rotate, append (retry), sync 
(succeed)
@@ -243,7 +201,7 @@ public class ReplicationLogTest {
     long commitId = 0;
 
     // Get the inner writer
-    LogFileWriter innerWriter = logWriter.getWriter();
+    LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
     assertNotNull("Inner writer should not be null", innerWriter);
 
     // Create a slow consumer to fill up the ring buffer.
@@ -257,7 +215,7 @@ public class ReplicationLogTest {
 
     // Fill up the ring buffer by sending enough events.
     for (int i = 0; i < TEST_RINGBUFFER_SIZE; i++) {
-      logWriter.append(tableName, commitId++, put);
+      logGroup.append(tableName, commitId++, put);
     }
 
     // Now try to append when the ring is full. This should block until space 
becomes
@@ -268,7 +226,7 @@ public class ReplicationLogTest {
     Thread appendThread = new Thread(() -> {
       try {
         startFuture.complete(null);
-        logWriter.append(tableName, myCommitId, put);
+        logGroup.append(tableName, myCommitId, put);
         appendFuture.complete(null);
       } catch (IOException e) {
         appendFuture.completeExceptionally(e);
@@ -293,6 +251,40 @@ public class ReplicationLogTest {
     verify(innerWriter, timeout(10000).times(1)).append(eq(tableName), 
eq(myCommitId), any());
   }
 
+  /**
+   * Tests the behavior when an append operation fails. Verifies that the 
system properly handles
+   * append failures by rolling to a new writer and retrying the operation.
+   */
+  @Test
+  public void testAppendFailureAndRetry() throws Exception {
+    final String tableName = "TBLAFR";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Get the inner writer
+    LogFileWriter writerBeforeRoll = logGroup.getActiveWriter().getWriter();
+    assertNotNull("Initial writer should not be null", writerBeforeRoll);
+
+    // Configure writerBeforeRoll to fail on the first append call
+    doThrow(new IOException("Simulated append 
failure")).when(writerBeforeRoll).append(anyString(),
+      anyLong(), any(Mutation.class));
+
+    // Append data
+    logGroup.append(tableName, commitId, put);
+    logGroup.sync();
+
+    // Get the inner writer we rolled to.
+    LogFileWriter writerAfterRoll = logGroup.getActiveWriter().getWriter();
+    assertNotNull("Rolled writer should not be null", writerAfterRoll);
+
+    // Verify the sequence: append (fail), rotate, append (succeed), sync
+    InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
+    inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put));
+    inOrder.verify(writerBeforeRoll, times(0)).sync(); // We failed append, 
did not try
+    inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put)); // Retry
+    inOrder.verify(writerAfterRoll, times(1)).sync();
+  }
+
   /**
    * Tests the sync timeout behavior. Verifies that sync operations time out 
after the configured
    * interval if they cannot complete.
@@ -304,7 +296,7 @@ public class ReplicationLogTest {
     final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
     // Get the inner writer
-    LogFileWriter innerWriter = logWriter.getWriter();
+    LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
     assertNotNull("Inner writer should not be null", innerWriter);
 
     doAnswer(new Answer<Object>() {
@@ -317,11 +309,11 @@ public class ReplicationLogTest {
     }).when(innerWriter).sync();
 
     // Append some data
-    logWriter.append(tableName, commitId, put);
+    logGroup.append(tableName, commitId, put);
 
     // Try to sync and expect it to timeout
     try {
-      logWriter.sync();
+      logGroup.sync();
       fail("Expected sync to timeout");
     } catch (IOException e) {
       assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
@@ -342,7 +334,7 @@ public class ReplicationLogTest {
     final CountDownLatch completionLatch = new CountDownLatch(2);
 
     // Get the inner writer
-    LogFileWriter innerWriter = logWriter.getWriter();
+    LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
     assertNotNull("Inner writer should not be null", innerWriter);
 
     // Thread 1: Append mutations with even commit IDs
@@ -352,7 +344,7 @@ public class ReplicationLogTest {
         for (int i = 0; i < APPENDS_PER_THREAD; i++) {
           final long commitId = i * 2;
           final Mutation put = LogFileTestUtil.newPut("row" + commitId, 
commitId, 1);
-          logWriter.append(tableName, commitId, put);
+          logGroup.append(tableName, commitId, put);
         }
       } catch (Exception e) {
         fail("Producer 1 failed: " + e.getMessage());
@@ -368,7 +360,7 @@ public class ReplicationLogTest {
         for (int i = 0; i < APPENDS_PER_THREAD; i++) {
           final long commitId = i * 2 + 1;
           final Mutation put = LogFileTestUtil.newPut("row" + commitId, 
commitId, 1);
-          logWriter.append(tableName, commitId, put);
+          logGroup.append(tableName, commitId, put);
         }
       } catch (Exception e) {
         fail("Producer 2 failed: " + e.getMessage());
@@ -387,7 +379,7 @@ public class ReplicationLogTest {
 
     // Perform a sync to ensure all appends are processed.
     InOrder inOrder = Mockito.inOrder(innerWriter); // To verify the below 
sync.
-    logWriter.sync();
+    logGroup.sync();
     // Verify the final sync was called.
     inOrder.verify(innerWriter, times(1)).sync();
 
@@ -396,7 +388,6 @@ public class ReplicationLogTest {
       final long commitId = i;
       verify(innerWriter, times(1)).append(eq(tableName), eq(commitId), any());
     }
-
   }
 
   /**
@@ -410,22 +401,22 @@ public class ReplicationLogTest {
     final long commitId = 1L;
 
     // Get the initial writer
-    LogFileWriter writerBeforeRotation = logWriter.getWriter();
+    LogFileWriter writerBeforeRotation = 
logGroup.getActiveWriter().getWriter();
     assertNotNull("Initial writer should not be null", writerBeforeRotation);
 
     // Append some data
-    logWriter.append(tableName, commitId, put);
-    logWriter.sync();
+    logGroup.append(tableName, commitId, put);
+    logGroup.sync();
 
     // Wait for rotation time to elapse
     Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
 
     // Append more data to trigger rotation check
-    logWriter.append(tableName, commitId + 1, put);
-    logWriter.sync();
+    logGroup.append(tableName, commitId + 1, put);
+    logGroup.sync();
 
     // Get the new writer after rotation
-    LogFileWriter writerAfterRotation = logWriter.getWriter();
+    LogFileWriter writerAfterRotation = logGroup.getActiveWriter().getWriter();
     assertNotNull("New writer should not be null", writerAfterRotation);
     assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
 
@@ -460,23 +451,23 @@ public class ReplicationLogTest {
     final Mutation put = LogFileTestUtil.newPut("row", 1, 10);
     long commitId = 1L;
 
-    LogFileWriter writerBeforeRotation = logWriter.getWriter();
+    LogFileWriter writerBeforeRotation = 
logGroup.getActiveWriter().getWriter();
     assertNotNull("Initial writer should not be null", writerBeforeRotation);
 
     // Append enough data so that we exceed the size threshold.
     for (int i = 0; i < 100; i++) {
-      logWriter.append(tableName, commitId++, put);
+      logGroup.append(tableName, commitId++, put);
     }
-    logWriter.sync(); // Should trigger a sized based rotation
+    logGroup.sync(); // Should trigger a sized based rotation
 
     // Get the new writer after the expected rotation.
-    LogFileWriter writerAfterRotation = logWriter.getWriter();
+    LogFileWriter writerAfterRotation = logGroup.getActiveWriter().getWriter();
     assertNotNull("New writer should not be null", writerAfterRotation);
     assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
 
     // Append one more mutation to verify we're using the new writer.
-    logWriter.append(tableName, commitId, put);
-    logWriter.sync();
+    logGroup.append(tableName, commitId, put);
+    logGroup.sync();
 
     // Verify the sequence of operations
     InOrder inOrder = Mockito.inOrder(writerBeforeRotation, 
writerAfterRotation);
@@ -501,21 +492,21 @@ public class ReplicationLogTest {
     final long commitId = 1L;
 
     // Get the inner writer
-    LogFileWriter innerWriter = logWriter.getWriter();
+    LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
     assertNotNull("Inner writer should not be null", innerWriter);
 
     // Append some data
-    logWriter.append(tableName, commitId, put);
+    logGroup.append(tableName, commitId, put);
 
     // Close the log writer
-    logWriter.close();
+    logGroup.close();
 
     // Verify the inner writer was closed
     verify(innerWriter, times(1)).close();
 
     // Verify we can't append after close
     try {
-      logWriter.append(tableName, commitId + 1, put);
+      logGroup.append(tableName, commitId + 1, put);
       fail("Expected append to fail after close");
     } catch (IOException e) {
       // Expected
@@ -523,14 +514,14 @@ public class ReplicationLogTest {
 
     // Verify we can't sync after close
     try {
-      logWriter.sync();
+      logGroup.sync();
       fail("Expected sync to fail after close");
     } catch (IOException e) {
       // Expected
     }
 
     // Verify we can close multiple times without error
-    logWriter.close();
+    logGroup.close();
   }
 
   /**
@@ -543,16 +534,16 @@ public class ReplicationLogTest {
     final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
     long commitId = 1L;
 
-    LogFileWriter writerBeforeRotation = logWriter.getWriter();
+    LogFileWriter writerBeforeRotation = 
logGroup.getActiveWriter().getWriter();
     assertNotNull("Initial writer should not be null", writerBeforeRotation);
 
     // Append some data and wait for the rotation time to elapse plus a small 
buffer.
-    logWriter.append(tableName, commitId, put);
-    logWriter.sync();
+    logGroup.append(tableName, commitId, put);
+    logGroup.sync();
     Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
 
     // Get the new writer after the rotation.
-    LogFileWriter writerAfterRotation = logWriter.getWriter();
+    LogFileWriter writerAfterRotation = logGroup.getActiveWriter().getWriter();
     assertNotNull("New writer should not be null", writerAfterRotation);
     assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
 
@@ -584,8 +575,10 @@ public class ReplicationLogTest {
     final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
     long commitId = 1L;
 
+    ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
     // Get the initial writer
-    LogFileWriter initialWriter = logWriter.getWriter();
+    LogFileWriter initialWriter = logGroupWriter.getWriter();
     assertNotNull("Initial writer should not be null", initialWriter);
 
     // Configure the log writer to fail only the first time when creating new 
writers.
@@ -595,27 +588,27 @@ public class ReplicationLogTest {
         throw new IOException("Simulated failure to create new writer");
       }
       return invocation.callRealMethod();
-    }).when(logWriter).createNewWriter(any(FileSystem.class), any(URI.class));
+    }).when(logGroupWriter).createNewWriter();
 
     // Append some data
-    logWriter.append(tableName, commitId, put);
-    logWriter.sync();
+    logGroup.append(tableName, commitId, put);
+    logGroup.sync();
 
     // Rotate the log.
-    LogFileWriter writerAfterFailedRotate = 
logWriter.rotateLog(RotationReason.TIME);
+    LogFileWriter writerAfterFailedRotate = 
logGroupWriter.rotateLog(RotationReason.TIME);
     assertEquals("Should still be using the initial writer", initialWriter,
       writerAfterFailedRotate);
 
     // While rotation is failing, verify we can continue to use the current 
writer.
-    logWriter.append(tableName, commitId + 1, put);
-    logWriter.sync();
+    logGroup.append(tableName, commitId + 1, put);
+    logGroup.sync();
 
-    LogFileWriter writerAfterRotate = logWriter.rotateLog(RotationReason.TIME);
+    LogFileWriter writerAfterRotate = 
logGroupWriter.rotateLog(RotationReason.TIME);
     assertNotEquals("Should be using a new writer", initialWriter, 
writerAfterRotate);
 
     // Try to append more data. This should work with the new writer after 
successful rotation.
-    logWriter.append(tableName, commitId + 2, put);
-    logWriter.sync();
+    logGroup.append(tableName, commitId + 2, put);
+    logGroup.sync();
 
     // Verify operations went to the writers in the correct order
     InOrder inOrder = Mockito.inOrder(initialWriter, writerAfterRotate);
@@ -640,23 +633,26 @@ public class ReplicationLogTest {
     final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
     long commitId = 1L;
 
-    LogFileWriter initialWriter = logWriter.getWriter();
+    ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
+    // Get the initial writer
+    LogFileWriter initialWriter = logGroupWriter.getWriter();
     assertNotNull("Initial writer should not be null", initialWriter);
 
     // Configure the log writer to always fail when creating new writers
-    doThrow(new IOException("Simulated failure to create new 
writer")).when(logWriter)
-      .createNewWriter(any(FileSystem.class), any(URI.class));
+    doThrow(new IOException("Simulated failure to create new 
writer")).when(logGroupWriter)
+      .createNewWriter();
 
     // Append some data
-    logWriter.append(tableName, commitId, put);
-    logWriter.sync();
+    logGroup.append(tableName, commitId, put);
+    logGroup.sync();
 
     // Try to rotate the log multiple times until we exceed the retry limit
-    for (int i = 0; i <= 
ReplicationLog.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES; i++) {
+    for (int i = 0; i <= 
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES; i++) {
       try {
-        logWriter.rotateLog(RotationReason.TIME);
+        logGroupWriter.rotateLog(RotationReason.TIME);
       } catch (IOException e) {
-        if (i < ReplicationLog.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES) {
+        if (i < ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES) {
           // Not the last attempt yet, continue
           continue;
         }
@@ -670,8 +666,8 @@ public class ReplicationLogTest {
 
     // Verify subsequent operations fail because the log is closed
     try {
-      logWriter.append(tableName, commitId + 1, put);
-      logWriter.sync();
+      logGroup.append(tableName, commitId + 1, put);
+      logGroup.sync();
       fail("Expected append to fail because log is closed");
     } catch (IOException e) {
       assertTrue("Expected an IOException because log is closed",
@@ -690,7 +686,7 @@ public class ReplicationLogTest {
     final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
     // Get the inner writer
-    LogFileWriter innerWriter = logWriter.getWriter();
+    LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
     assertNotNull("Writer should not be null", innerWriter);
 
     // Configure writer to throw a RuntimeException on append
@@ -698,9 +694,9 @@ public class ReplicationLogTest {
       anyLong(), any(Mutation.class));
 
     // Append data. This should trigger the LogExceptionHandler, which will 
close logWriter.
-    logWriter.append(tableName, commitId, put);
+    logGroup.append(tableName, commitId, put);
     try {
-      logWriter.sync();
+      logGroup.sync();
       fail("Should have thrown IOException because sync timed out");
     } catch (IOException e) {
       assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
@@ -708,7 +704,7 @@ public class ReplicationLogTest {
 
     // Verify that subsequent operations fail because the log is closed
     try {
-      logWriter.append(tableName, commitId + 1, put);
+      logGroup.append(tableName, commitId + 1, put);
       fail("Should have thrown IOException because log is closed");
     } catch (IOException e) {
       assertTrue("Expected an IOException because log is closed",
@@ -729,30 +725,31 @@ public class ReplicationLogTest {
     final long commitId = 1L;
     final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
+    ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
     // Get the initial writer
-    LogFileWriter initialWriter = logWriter.getWriter();
+    LogFileWriter initialWriter = logGroupWriter.getWriter();
     assertNotNull("Initial writer should not be null", initialWriter);
 
     // Configure initial writer to fail on sync
     doThrow(new IOException("Simulated sync 
failure")).when(initialWriter).sync();
 
     // createNewWriter should keep returning the bad writer
-    doAnswer(invocation -> 
initialWriter).when(logWriter).createNewWriter(any(FileSystem.class),
-      any(URI.class));
+    doAnswer(invocation -> 
initialWriter).when(logGroupWriter).createNewWriter();
 
     // Append data
-    logWriter.append(tableName, commitId, put);
+    logGroup.append(tableName, commitId, put);
 
     // Try to sync. Should fail after exhausting retries.
     try {
-      logWriter.sync();
+      logGroup.sync();
       fail("Expected sync to fail after exhausting retries");
     } catch (IOException e) {
       assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
     }
 
     // Each retry creates a new writer, so that is at least 1 create + 5 
retries.
-    verify(logWriter, atLeast(6)).createNewWriter(any(FileSystem.class), 
any(URI.class));
+    verify(logGroupWriter, atLeast(6)).createNewWriter();
   }
 
   /**
@@ -766,24 +763,24 @@ public class ReplicationLogTest {
     long commitId = 1L;
 
     // Get the initial writer
-    LogFileWriter writerBeforeRotation = logWriter.getWriter();
+    LogFileWriter writerBeforeRotation = 
logGroup.getActiveWriter().getWriter();
     assertNotNull("Initial writer should not be null", writerBeforeRotation);
 
     // Append several items to fill currentBatch but don't sync yet
     for (int i = 0; i < 5; i++) {
-      logWriter.append(tableName, commitId + i, put);
+      logGroup.append(tableName, commitId + i, put);
     }
 
     // Force a rotation by waiting for rotation time to elapse
     Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
 
     // Get the new writer after rotation
-    LogFileWriter writerAfterRotation = logWriter.getWriter();
+    LogFileWriter writerAfterRotation = logGroup.getActiveWriter().getWriter();
     assertNotNull("New writer should not be null", writerAfterRotation);
     assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
 
     // Now trigger a sync which should replay the currentBatch to the new 
writer
-    logWriter.sync();
+    logGroup.sync();
 
     // Verify the sequence of operations
     InOrder inOrder = Mockito.inOrder(writerBeforeRotation, 
writerAfterRotation);
@@ -817,18 +814,20 @@ public class ReplicationLogTest {
     final int NUM_RECORDS = 100;
     List<LogFile.Record> originalRecords = new ArrayList<>();
 
+    ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
     // Get the path of the log file.
-    Path logPath = logWriter.getWriter().getContext().getFilePath();
+    Path logPath = logGroupWriter.getWriter().getContext().getFilePath();
 
     for (int i = 0; i < NUM_RECORDS; i++) {
       LogFile.Record record = LogFileTestUtil.newPutRecord(tableName, i, "row" 
+ i, i, 1);
       originalRecords.add(record);
-      logWriter.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
+      logGroup.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
     }
-    logWriter.sync(); // Sync to commit the appends to the current writer.
+    logGroup.sync(); // Sync to commit the appends to the current writer.
 
     // Force a rotation to close the current writer.
-    logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+    logGroupWriter.rotateLog(RotationReason.SIZE);
 
     assertTrue("Log file should exist", localFs.exists(logPath));
 
@@ -869,10 +868,12 @@ public class ReplicationLogTest {
     List<LogFile.Record> originalRecords = new ArrayList<>();
     List<Path> logPaths = new ArrayList<>();
 
+    ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
     // Write records across multiple rotations.
     for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
       // Get the path of the current log file.
-      Path logPath = logWriter.getWriter().getContext().getFilePath();
+      Path logPath = logGroupWriter.getWriter().getContext().getFilePath();
       logPaths.add(logPath);
 
       for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
@@ -880,11 +881,11 @@ public class ReplicationLogTest {
         LogFile.Record record =
           LogFileTestUtil.newPutRecord(tableName, commitId, "row" + commitId, 
commitId, 1);
         originalRecords.add(record);
-        logWriter.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
+        logGroup.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
       }
-      logWriter.sync(); // Sync to commit the appends to the current writer.
+      logGroup.sync(); // Sync to commit the appends to the current writer.
       // Force a rotation to close the current writer.
-      logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+      logGroupWriter.rotateLog(RotationReason.SIZE);
     }
 
     // Verify all log files exist
@@ -934,10 +935,12 @@ public class ReplicationLogTest {
     List<LogFile.Record> originalRecords = new ArrayList<>();
     List<Path> logPaths = new ArrayList<>();
 
+    ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
     // Write records across multiple rotations, only syncing 50% of the time.
     for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
       // Get the path of the current log file.
-      Path logPath = logWriter.getWriter().getContext().getFilePath();
+      Path logPath = logGroupWriter.getWriter().getContext().getFilePath();
       logPaths.add(logPath);
 
       for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
@@ -945,16 +948,16 @@ public class ReplicationLogTest {
         LogFile.Record record =
           LogFileTestUtil.newPutRecord(tableName, commitId, "row" + commitId, 
commitId, 1);
         originalRecords.add(record);
-        logWriter.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
+        logGroup.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
       }
 
       // Only sync 50% of the time before rotation. To ensure we sync on the 
last file
       // we are going to write, use 'rotation % 2 == 1' instead of 'rotation % 
2 == 0'.
       if (rotation % 2 == 1) {
-        logWriter.sync(); // Sync to commit the appends to the current writer.
+        logGroup.sync(); // Sync to commit the appends to the current writer.
       }
       // Force a rotation to close the current writer.
-      logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+      logGroupWriter.rotateLog(RotationReason.SIZE);
     }
 
     // Verify all log files exist
@@ -999,17 +1002,17 @@ public class ReplicationLogTest {
     final long commitId = 1L;
     final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
-    // Get the inner writer
-    LogFileWriter innerWriter = logWriter.getWriter();
-    assertNotNull("Writer should not be null", innerWriter);
+    // Get the initial writer
+    LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
+    assertNotNull("Inner writer should not be null", innerWriter);
 
     // Configure writer to throw RuntimeException on getLength()
     doThrow(new RuntimeException("Simulated critical 
error")).when(innerWriter).getLength();
 
     // Append data. This should trigger the LogExceptionHandler, which will 
close logWriter.
-    logWriter.append(tableName, commitId, put);
+    logGroup.append(tableName, commitId, put);
     try {
-      logWriter.sync();
+      logGroup.sync();
       fail("Should have thrown IOException because sync timed out");
     } catch (IOException e) {
       assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
@@ -1017,7 +1020,7 @@ public class ReplicationLogTest {
 
     // Verify that subsequent operations fail because the log is closed
     try {
-      logWriter.append(tableName, commitId + 1, put);
+      logGroup.append(tableName, commitId + 1, put);
       fail("Should have thrown IOException because log is closed");
     } catch (IOException e) {
       assertTrue("Expected an IOException because log is closed",
@@ -1039,7 +1042,7 @@ public class ReplicationLogTest {
     final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
     // Get the inner writer
-    LogFileWriter innerWriter = logWriter.getWriter();
+    LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
     assertNotNull("Writer should not be null", innerWriter);
 
     // Configure writer to throw RuntimeException on append
@@ -1047,9 +1050,9 @@ public class ReplicationLogTest {
       anyLong(), any(Mutation.class));
 
     // Append data to trigger closeOnError()
-    logWriter.append(tableName, commitId, put);
+    logGroup.append(tableName, commitId, put);
     try {
-      logWriter.sync();
+      logGroup.sync();
       fail("Should have thrown IOException because sync timed out");
     } catch (IOException e) {
       assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
@@ -1057,7 +1060,7 @@ public class ReplicationLogTest {
 
     // Verify that subsequent append operations fail because the log is closed
     try {
-      logWriter.append(tableName, commitId, put);
+      logGroup.append(tableName, commitId, put);
       fail("Should have thrown IOException because log is closed");
     } catch (IOException e) {
       assertTrue("Expected an IOException because log is closed",
@@ -1079,7 +1082,7 @@ public class ReplicationLogTest {
     final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
 
     // Get the inner writer
-    LogFileWriter innerWriter = logWriter.getWriter();
+    LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
     assertNotNull("Writer should not be null", innerWriter);
 
     // Configure writer to throw RuntimeException on append
@@ -1087,9 +1090,9 @@ public class ReplicationLogTest {
       anyLong(), any(Mutation.class));
 
     // Append data to trigger closeOnError()
-    logWriter.append(tableName, commitId, put);
+    logGroup.append(tableName, commitId, put);
     try {
-      logWriter.sync();
+      logGroup.sync();
       fail("Should have thrown IOException because sync timed out");
     } catch (IOException e) {
       assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
@@ -1097,7 +1100,7 @@ public class ReplicationLogTest {
 
     // Verify that subsequent sync operations fail because the log is closed
     try {
-      logWriter.sync();
+      logGroup.sync();
       fail("Should have thrown IOException because log is closed");
     } catch (IOException e) {
       assertTrue("Expected an IOException because log is closed",
@@ -1108,175 +1111,6 @@ public class ReplicationLogTest {
     verify(innerWriter, times(1)).close();
   }
 
-  /**
-   * Tests race condition between LogRotationTask and LogEventHandler when 
both try to rotate the
-   * writer simultaneously. Verifies that despite concurrent rotation 
attempts, the log is only
-   * rotated once. Uses latches to ensure true concurrency and verify the 
sequence of operations.
-   */
-  @Test
-  public void testConcurrentRotationAttempts() throws Exception {
-    final String tableName = "TBLCR";
-    final long commitId = 1L;
-    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
-
-    // Get the initial writer
-    LogFileWriter initialWriter = logWriter.getWriter();
-    assertNotNull("Initial writer should not be null", initialWriter);
-
-    // Create latches to control timing and track rotation attempts
-    final CountDownLatch rotationTaskStarted = new CountDownLatch(1);
-    final CountDownLatch rotationTaskCanProceed = new CountDownLatch(1);
-    final CountDownLatch eventHandlerStarted = new CountDownLatch(1);
-    final CountDownLatch eventHandlerCanProceed = new CountDownLatch(1);
-    final AtomicInteger rotationCount = new AtomicInteger(0);
-    final CountDownLatch bothAttemptsStarted = new CountDownLatch(2);
-
-    // Configure the rotation task to pause at specific points and track 
attempts
-    doAnswer(invocation -> {
-      rotationTaskStarted.countDown(); // Signal that rotation task has started
-      bothAttemptsStarted.countDown(); // Signal that this attempt has started
-      rotationTaskCanProceed.await(); // Wait for permission to proceed
-      rotationCount.incrementAndGet(); // Track this rotation attempt
-      return invocation.callRealMethod();
-    }).when(logWriter).rotateLog(ReplicationLog.RotationReason.TIME);
-
-    // Configure the event handler to pause at specific points
-    doAnswer(invocation -> {
-      eventHandlerStarted.countDown(); // Signal that event handler has started
-      bothAttemptsStarted.countDown(); // Signal that this attempt has started
-      eventHandlerCanProceed.await(); // Wait for permission to proceed
-      return invocation.callRealMethod();
-    }).when(logWriter).getWriter();
-
-    // Start a thread that will trigger rotation via the background task
-    Thread rotationThread = new Thread(() -> {
-      try {
-        // Force rotation by waiting for rotation time
-        Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    });
-    rotationThread.start();
-
-    // Start append operation in main thread
-    logWriter.append(tableName, commitId, put);
-
-    // Wait for both attempts to start - this ensures true concurrency
-    assertTrue("Both rotation attempts should start",
-      bothAttemptsStarted.await(5, TimeUnit.SECONDS));
-
-    // Verify both attempts have started before proceeding
-    assertEquals("Both attempts should have started", 0, 
bothAttemptsStarted.getCount());
-
-    // Allow both operations to proceed simultaneously
-    eventHandlerCanProceed.countDown();
-    rotationTaskCanProceed.countDown();
-
-    // Wait for both operations to complete
-    rotationThread.join();
-
-    // Verify the final state
-    LogFileWriter finalWriter = logWriter.getWriter();
-    assertNotNull("Final writer should not be null", finalWriter);
-    assertTrue("Writer should have been rotated", finalWriter != 
initialWriter);
-
-    // Verify only one rotation actually occurred
-    assertEquals("Should have only one actual rotation", 1, 
rotationCount.get());
-
-    // Verify all operations completed successfully
-    logWriter.sync();
-
-    // Verify the sequence of operations through the latches
-    assertTrue("Rotation task should have started", 
rotationTaskStarted.getCount() == 0);
-    assertTrue("Event handler should have started", 
eventHandlerStarted.getCount() == 0);
-  }
-
-  /**
-   * Tests race condition between LogEventHandler retry loop and 
LogRotationTask when both try to
-   * rotate the writer simultaneously. Verifies that despite concurrent 
rotation attempts during a
-   * retry scenario, the log is only rotated once. Uses latches to ensure true 
concurrency and
-   * verify the sequence of operations.
-   */
-  @Test
-  public void testConcurrentRotationDuringRetry() throws Exception {
-    final String tableName = "TBLCRR";
-    final long commitId = 1L;
-    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
-
-    // Get the initial writer
-    LogFileWriter initialWriter = logWriter.getWriter();
-    assertNotNull("Initial writer should not be null", initialWriter);
-
-    // Create latches to control timing and track rotation attempts
-    final CountDownLatch retryStarted = new CountDownLatch(1);
-    final CountDownLatch retryCanProceed = new CountDownLatch(1);
-    final CountDownLatch rotationTaskStarted = new CountDownLatch(1);
-    final CountDownLatch rotationTaskCanProceed = new CountDownLatch(1);
-    final AtomicInteger rotationCount = new AtomicInteger(0);
-    final CountDownLatch bothAttemptsStarted = new CountDownLatch(2);
-
-    // Configure the writer to fail on first append, succeed on retry
-    doAnswer(invocation -> {
-      retryStarted.countDown(); // Signal that retry has started
-      bothAttemptsStarted.countDown(); // Signal that this attempt has started
-      retryCanProceed.await(); // Wait for permission to proceed
-      return invocation.callRealMethod();
-    }).when(initialWriter).append(anyString(), anyLong(), any(Mutation.class));
-
-    // Configure the rotation task to pause at specific points and track 
attempts
-    doAnswer(invocation -> {
-      rotationTaskStarted.countDown(); // Signal that rotation task has started
-      bothAttemptsStarted.countDown(); // Signal that this attempt has started
-      rotationTaskCanProceed.await(); // Wait for permission to proceed
-      rotationCount.incrementAndGet(); // Track this rotation attempt
-      return invocation.callRealMethod();
-    }).when(logWriter).rotateLog(ReplicationLog.RotationReason.TIME);
-
-    // Start a thread that will trigger rotation via the background task
-    Thread rotationThread = new Thread(() -> {
-      try {
-        // Force rotation by waiting for rotation time
-        Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      }
-    });
-    rotationThread.start();
-
-    // Start append operation in main thread
-    logWriter.append(tableName, commitId, put);
-
-    // Wait for both attempts to start - this ensures true concurrency
-    assertTrue("Both rotation attempts should start",
-      bothAttemptsStarted.await(5, TimeUnit.SECONDS));
-
-    // Verify both attempts have started before proceeding
-    assertEquals("Both attempts should have started", 0, 
bothAttemptsStarted.getCount());
-
-    // Allow both operations to proceed simultaneously
-    retryCanProceed.countDown();
-    rotationTaskCanProceed.countDown();
-
-    // Wait for both operations to complete
-    rotationThread.join();
-
-    // Verify the final state
-    LogFileWriter finalWriter = logWriter.getWriter();
-    assertNotNull("Final writer should not be null", finalWriter);
-    assertTrue("Writer should have been rotated", finalWriter != 
initialWriter);
-
-    // Verify only one rotation actually occurred
-    assertEquals("Should have only one actual rotation", 1, 
rotationCount.get());
-
-    // Verify all operations completed successfully
-    logWriter.sync();
-
-    // Verify the sequence of operations through the latches
-    assertTrue("Retry should have started", retryStarted.getCount() == 0);
-    assertTrue("Rotation task should have started", 
rotationTaskStarted.getCount() == 0);
-  }
-
   /**
    * Tests that multiple sync requests are consolidated into a single sync 
operation on the inner
    * writer when they occur in quick succession. Verifies that the Disruptor 
batching and
@@ -1293,7 +1127,7 @@ public class ReplicationLogTest {
     final Mutation put3 = LogFileTestUtil.newPut("row3", 3, 1);
     final long commitId3 = 3L;
 
-    LogFileWriter innerWriter = logWriter.getWriter();
+    LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
     assertNotNull("Inner writer should not be null", innerWriter);
 
     // Configure writer to briefly hold up the LogEventHandler upon first 
append.
@@ -1308,12 +1142,12 @@ public class ReplicationLogTest {
     // Post appends and three syncs in quick succession. The first append will 
be delayed long
     // enough for the three syncs to appear in a single Disruptor batch. Then 
they should all
     // be consolidated into a single sync.
-    logWriter.append(tableName, commitId1, put1);
-    logWriter.sync();
-    logWriter.append(tableName, commitId2, put2);
-    logWriter.sync();
-    logWriter.append(tableName, commitId3, put3);
-    logWriter.sync();
+    logGroup.append(tableName, commitId1, put1);
+    logGroup.sync();
+    logGroup.append(tableName, commitId2, put2);
+    logGroup.sync();
+    logGroup.append(tableName, commitId3, put3);
+    logGroup.sync();
 
     // Verify the sequence of operations on the inner writer: the three 
appends, then exactly
     // one sync.
@@ -1324,21 +1158,102 @@ public class ReplicationLogTest {
     inOrder.verify(innerWriter, times(1)).sync(); // Only one sync should be 
called
   }
 
-  static class TestableReplicationLog extends ReplicationLog {
+  /**
+   * Tests that ReplicationLogGroup.get() returns the same instance for the 
same haGroupId. Verifies
+   * that multiple calls with the same parameters return the cached instance.
+   */
+  @Test
+  public void testReplicationLogGroupCaching() throws Exception {
+    final String haGroupId1 = "testHAGroup1";
+    final String haGroupId2 = "testHAGroup2";
+
+    // Get instances for the first HA group
+    ReplicationLogGroup g1_1 = ReplicationLogGroup.get(conf, serverName, 
haGroupId1);
+    ReplicationLogGroup g1_2 = ReplicationLogGroup.get(conf, serverName, 
haGroupId1);
+
+    // Verify same instance is returned for same haGroupId
+    assertNotNull("ReplicationLogGroup should not be null", g1_1);
+    assertNotNull("ReplicationLogGroup should not be null", g1_2);
+    assertTrue("Same instance should be returned for same haGroupId", g1_2 == 
g1_1);
+    assertEquals("HA Group name should match", haGroupId1, 
g1_1.getHaGroupName());
+
+    // Get instance for a different HA group
+    ReplicationLogGroup g2_1 = ReplicationLogGroup.get(conf, serverName, 
haGroupId2);
+    assertNotNull("ReplicationLogGroup should not be null", g2_1);
+    assertTrue("Different instance should be returned for different 
haGroupId", g2_1 != g1_1);
+    assertEquals("HA Group name should match", haGroupId2, 
g2_1.getHaGroupName());
+
+    // Verify multiple calls still return cached instances
+    ReplicationLogGroup g1_3 = ReplicationLogGroup.get(conf, serverName, 
haGroupId1);
+    ReplicationLogGroup g2_2 = ReplicationLogGroup.get(conf, serverName, 
haGroupId2);
+    assertTrue("Cached instance should be returned", g1_3 == g1_1);
+    assertTrue("Cached instance should be returned", g2_2 == g2_1);
+
+    // Clean up
+    g1_1.close();
+    g2_1.close();
+  }
+
+  /**
+   * Tests that close() removes the instance from the cache. Verifies that 
after closing, a new call
+   * to get() creates a new instance.
+   */
+  @Test
+  public void testReplicationLogGroupCacheRemovalOnClose() throws Exception {
+    final String haGroupId = "testHAGroupCacheRemoval";
+
+    // Get initial instance
+    ReplicationLogGroup g1_1 = ReplicationLogGroup.get(conf, serverName, 
haGroupId);
+    assertNotNull("ReplicationLogGroup should not be null", g1_1);
+    assertFalse("Group should not be closed initially", g1_1.isClosed());
+
+    // Verify cached instance is returned
+    ReplicationLogGroup g1_2 = ReplicationLogGroup.get(conf, serverName, 
haGroupId);
+    assertTrue("Same instance should be returned before close", g1_2 == g1_1);
+
+    // Close the group
+    g1_1.close();
+    assertTrue("Group should be closed", g1_1.isClosed());
+
+    // Get instance after close - should be a new instance
+    ReplicationLogGroup g1_3 = ReplicationLogGroup.get(conf, serverName, 
haGroupId);
+    assertNotNull("ReplicationLogGroup should not be null after close", g1_3);
+    assertFalse("New group should not be closed", g1_3.isClosed());
+    assertTrue("New instance should be created after close", g1_1 != g1_3);
+    assertEquals("HA Group name should match", haGroupId, 
g1_3.getHaGroupName());
+
+    // Clean up
+    g1_3.close();
+  }
 
-    protected TestableReplicationLog(Configuration conf, ServerName 
serverName) {
-      super(conf, serverName);
+  static class TestableLogGroup extends ReplicationLogGroup {
+
+    public TestableLogGroup(Configuration conf, ServerName serverName, String 
haGroupName) {
+      super(conf, serverName, haGroupName);
     }
 
     @Override
-    protected LogFileWriter createNewWriter(FileSystem fs, URI url) throws 
IOException {
-      return spy(super.createNewWriter(fs, url));
+    protected ReplicationLogGroupWriter createRemoteWriter() throws 
IOException {
+      ReplicationLogGroupWriter writer = spy(new 
TestableStandbyLogGroupWriter(this));
+      writer.init();
+      return writer;
+    }
+
+  }
+
+  /**
+   * Testable version of StandbyLogGroupWriter that allows spying on writers.
+   */
+  static class TestableStandbyLogGroupWriter extends StandbyLogGroupWriter {
+
+    protected TestableStandbyLogGroupWriter(ReplicationLogGroup logGroup) {
+      super(logGroup);
     }
 
     @Override
-    protected MetricsReplicationLogSource createMetricsSource() {
-      return new MetricsReplicationLogSourceImpl();
+    protected LogFileWriter createNewWriter() throws IOException {
+      LogFileWriter writer = super.createNewWriter();
+      return spy(writer);
     }
   }
-
 }

Reply via email to