This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new a0b899421f PHOENIX-7567 Replication Log Writer (Synchronous mode) 
(#2144)
a0b899421f is described below

commit a0b899421f57011da1b23e271b489bec005ba25c
Author: Andrew Purtell <[email protected]>
AuthorDate: Thu Dec 18 10:58:13 2025 -0800

    PHOENIX-7567 Replication Log Writer (Synchronous mode) (#2144)
    
    Conflicts:
            phoenix-core-server/pom.xml
            
phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
---
 phoenix-core-server/pom.xml                        |   12 +
 .../apache/phoenix/replication/ReplicationLog.java | 1085 ++++++++++++++++
 .../phoenix/replication/log/LogFileWriter.java     |   17 +-
 .../metrics/MetricsReplicationLogSource.java       |  114 ++
 .../metrics/MetricsReplicationLogSourceImpl.java   |  133 ++
 .../metrics/ReplicationLogMetricValues.java        |   83 ++
 .../phoenix/replication/metrics/package-info.java  |   23 +
 .../apache/phoenix/replication/package-info.java   |   23 +
 .../phoenix/replication/tool/LogFileAnalyzer.java  |  209 +++
 .../org/apache/phoenix/replication/tool/README.md  |   70 +
 .../phoenix/replication/tool/package-info.java     |   22 +
 .../phoenix/replication/ReplicationLogTest.java    | 1344 ++++++++++++++++++++
 pom.xml                                            |    2 +-
 13 files changed, 3135 insertions(+), 2 deletions(-)

diff --git a/phoenix-core-server/pom.xml b/phoenix-core-server/pom.xml
index 54c6afe39b..ef5140feb9 100644
--- a/phoenix-core-server/pom.xml
+++ b/phoenix-core-server/pom.xml
@@ -87,6 +87,14 @@
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-metrics-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-hadoop-compat</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-hadoop2-compat</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-client</artifactId>
@@ -177,6 +185,10 @@
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.lmax</groupId>
+      <artifactId>disruptor</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
new file mode 100644
index 0000000000..3a2d9567a0
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -0,0 +1,1085 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.YieldingWaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogSource;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogSourceImpl;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * The ReplicationLog implements a high-performance logging system for 
mutations that need to be
+ * replicated to another cluster.
+ * <p>
+ * Key features:
+ * <ul>
+ * <li>Asynchronous append operations with batching for high throughput</li>
+ * <li>Controlled blocking sync operations with timeout and retry logic</li>
+ * <li>Automatic log rotation based on time and size thresholds</li>
+ * <li>Sharded directory structure for better HDFS performance</li>
+ * <li>Metrics for monitoring</li>
+ * <li>Fail-stop behavior on critical errors</li>
+ * </ul>
+ * <p>
+ * The class supports three replication modes (though currently only SYNC is 
implemented):
+ * <ul>
+ * <li>SYNC: Direct writes to standby cluster (default)</li>
+ * <li>STORE_AND_FORWARD: Local storage when standby is unavailable</li>
+ * <li>SYNC_AND_FORWARD: Concurrent direct writes and queue draining</li>
+ * </ul>
+ * <p>
+ * Key configuration properties:
+ * <ul>
+ * <li>{@link #REPLICATION_STANDBY_HDFS_URL_KEY}: URL for standby cluster 
HDFS</li>
+ * <li>{@link #REPLICATION_FALLBACK_HDFS_URL_KEY}: URL for local fallback 
storage</li>
+ * <li>{@link #REPLICATION_NUM_SHARDS_KEY}: Number of shard directories</li>
+ * <li>{@link #REPLICATION_LOG_ROTATION_TIME_MS_KEY}: Time-based rotation 
interval</li>
+ * <li>{@link #REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY}: Size-based rotation 
threshold</li>
+ * <li>{@link #REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY}: Compression 
algorithm</li>
+ * </ul>
+ * <p>
+ * This class is intended to be thread-safe.
+ * <p>
+ * Architecture Overview:
+ *
+ * <pre>
+ * ┌──────────────────────────────────────────────────────────────────────┐
+ * │                           ReplicationLog                             │
+ * │                                                                      │
+ * │  ┌─────────────┐     ┌────────────────────────────────────────────┐  │
+ * │  │             │     │                                            │  │
+ * │  │  Producers  │     │  Disruptor Ring Buffer                     │  │
+ * │  │  (append/   │────▶│  ┌─────────┐ ┌─────────┐ ┌─────────┐       │  │
+ * │  │   sync)     │     │  │ Event 1 │ │ Event 2 │ │ Event 3 │ ...   │  │
+ * │  │             │     │  └─────────┘ └─────────┘ └─────────┘       │  │
+ * │  └─────────────┘     └────────────────────────────────────────────┘  │
+ * │                                │                                     │
+ * │                                ▼                                     │
+ * │  ┌─────────────────────────────────────────────────────────────┐     │
+ * │  │                                                             │     │
+ * │  │  LogEventHandler                                            │     │
+ * │  │  ┌──────────────────────────────────────────────────────┐   │     │
+ * │  │  │                                                      │   │     │
+ * │  │  │  - Batch Management                                  │   │     │
+ * │  │  │  - Writer Rotation                                   │   │     │
+ * │  │  │  - Error Handling                                    │   │     │
+ * │  │  │  - Mode Transitions                                  │   │     │
+ * │  │  │                                                      │   │     │
+ * │  │  └──────────────────────────────────────────────────────┘   │     │
+ * │  │                             │                               │     │
+ * │  │                             ▼                               │     │
+ * │  │  ┌──────────────────────────────────────────────────────┐   │     │
+ * │  │  │                                                      │   │     │
+ * │  │  │  LogFileWriter                                       │   │     │
+ * │  │  │  - File Management                                   │   │     │
+ * │  │  │  - Compression                                       │   │     │
+ * │  │  │  - HDFS Operations                                   │   │     │
+ * │  │  │                                                      │   │     │
+ * │  │  └──────────────────────────────────────────────────────┘   │     │
+ * │  └─────────────────────────────────────────────────────────────┘     │
+ * └──────────────────────────────────────────────────────────────────────┘
+ * </pre>
+ * <p>
+ * A high-performance ring buffer decouples the API from the complexity of 
writer management.
+ * Callers of append and sync generally return quickly, except for sync, where 
the writer must
+ * suspend the caller until the sync operation is successful (or times out). 
An internal single
+ * threaded process handles these events, encapsulating the complexity of 
batching mutations for
+ * efficiency, consolidating multiple in-flight syncs, rotating writers based 
on time or size, error
+ * handling and retries, and mode transitions.
+ */
[email protected](
+    value = { "EI_EXPOSE_REP", "EI_EXPOSE_REP2", "MS_EXPOSE_REP" }, 
justification = "Intentional")
+public class ReplicationLog {
+
+  /** The path on the standby HDFS where log files should be written. (The 
"IN" directory.) */
+  public static final String REPLICATION_STANDBY_HDFS_URL_KEY =
+    "phoenix.replication.log.standby.hdfs.url";
+  /**
+   * The path on the active HDFS where log files should be written when we 
have fallen back to store
+   * and forward mode. (The "OUT" directory.)
+   */
+  public static final String REPLICATION_FALLBACK_HDFS_URL_KEY =
+    "phoenix.replication.log.fallback.hdfs.url";
+  /**
+   * The number of shards (subfolders) to maintain in the "IN" directory.
+   * <p>
+   * Shard directories have the format shard-NNNNN, e.g. shard-00001. The 
maximum value is 100000.
+   */
+  public static final String REPLICATION_NUM_SHARDS_KEY = 
"phoenix.replication.log.shards";
+  public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000;
+  public static final int MAX_REPLICATION_NUM_SHARDS = 100000;
+  /** Replication log rotation time trigger, default is 1 minute */
+  public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
+    "phoenix.replication.log.rotation.time.ms";
+  public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 
1000L;
+  /** Replication log rotation size trigger, default is 256 MB */
+  public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY =
+    "phoenix.replication.log.rotation.size.bytes";
+  public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 * 
1024 * 1024L;
+  /** Replication log rotation size trigger percentage, default is 0.95 */
+  public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY =
+    "phoenix.replication.log.rotation.size.percentage";
+  public static final double DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE 
= 0.95;
+  /** Replication log compression, default is "NONE" */
+  public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY =
+    "phoenix.replication.log.compression";
+  public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM = 
"NONE";
+  public static final String REPLICATION_LOG_RINGBUFFER_SIZE_KEY =
+    "phoenix.replication.log.ringbuffer.size";
+  public static final int DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE = 1024 * 32; 
// Too big?
+  public static final String REPLICATION_LOG_SYNC_TIMEOUT_KEY =
+    "phoenix.replication.log.sync.timeout.ms";
+  public static final long DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT = 1000 * 30;
+  public static final String REPLICATION_LOG_SYNC_RETRIES_KEY =
+    "phoenix.replication.log.sync.retries";
+  public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 5;
+  public static final String REPLICATION_LOG_ROTATION_RETRIES_KEY =
+    "phoenix.replication.log.rotation.retries";
+  public static final int DEFAULT_REPLICATION_LOG_ROTATION_RETRIES = 5;
+  public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
+    "phoenix.replication.log.retry.delay.ms";
+  public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
+
+  public static final String SHARD_DIR_FORMAT = "shard%05d";
+  public static final String FILE_NAME_FORMAT = "%d-%s.plog";
+
+  static final byte EVENT_TYPE_DATA = 0;
+  static final byte EVENT_TYPE_SYNC = 1;
+
+  static final Logger LOG = LoggerFactory.getLogger(ReplicationLog.class);
+
+  protected static volatile ReplicationLog instance;
+
+  protected final Configuration conf;
+  protected final ServerName serverName;
+  protected FileSystem standbyFs;
+  protected FileSystem fallbackFs; // For store-and-forward (future use)
+  protected int numShards;
+  protected URI standbyUrl;
+  protected URI fallbackUrl; // For store-and-forward (future use)
+  protected final long rotationTimeMs;
+  protected final long rotationSizeBytes;
+  protected final int maxRotationRetries;
+  protected final Compression.Algorithm compression;
+  protected final ReentrantLock lock = new ReentrantLock();
+  protected volatile LogFileWriter currentWriter; // Current writer
+  protected final AtomicLong lastRotationTime = new AtomicLong();
+  protected final AtomicLong writerGeneration = new AtomicLong();
+  protected final AtomicLong rotationFailures = new AtomicLong(0);
+  protected ScheduledExecutorService rotationExecutor;
+  protected final int ringBufferSize;
+  protected final long syncTimeoutMs;
+  protected Disruptor<LogEvent> disruptor;
+  protected RingBuffer<LogEvent> ringBuffer;
+  protected final ConcurrentHashMap<Path, Object> shardMap = new 
ConcurrentHashMap<>();
+  protected final MetricsReplicationLogSource metrics;
+  protected volatile boolean isClosed = false;
+
+  /**
+   * Tracks the current replication mode of the ReplicationLog.
+   * <p>
+   * The replication mode determines how mutations are handled:
+   * <ul>
+   * <li>SYNC: Normal operation where mutations are written directly to the 
standby cluster's HDFS.
+   * This is the default and primary mode of operation.</li>
+   * <li>STORE_AND_FORWARD: Fallback mode when the standby cluster's HDFS is 
unavailable. Mutations
+   * are stored locally and will be forwarded when connectivity is 
restored.</li>
+   * <li>SYNC_AND_FORWARD: Transitional mode where new mutations are written 
directly to the standby
+   * cluster while concurrently draining the local queue of previously stored 
mutations.</li>
+   * </ul>
+   * <p>
+   * Mode transitions occur automatically based on the availability of the 
standby cluster's HDFS
+   * and the state of the local mutation queue.
+   */
+  protected enum ReplicationMode {
+    /**
+     * Normal operation where mutations are written directly to the standby 
cluster's HDFS. This is
+     * the default and primary mode of operation.
+     */
+    SYNC,
+
+    /**
+     * Fallback mode when the standby cluster's HDFS is unavailable. Mutations 
are stored locally
+     * and will be forwarded when connectivity is restored.
+     */
+    STORE_AND_FORWARD,
+
+    /**
+     * Transitional mode where new mutations are written directly to the 
standby cluster while
+     * concurrently draining the local queue of previously stored mutations. 
This mode is entered
+     * when connectivity to the standby cluster is restored while there are 
still mutations in the
+     * local queue.
+     */
+    SYNC_AND_FORWARD;
+  }
+
+  /** The reason for requesting a log rotation. */
+  protected enum RotationReason {
+    /** Rotation requested due to time threshold being exceeded. */
+    TIME,
+    /** Rotation requested due to size threshold being exceeded. */
+    SIZE,
+    /** Rotation requested due to an error condition. */
+    ERROR;
+  }
+
+  /**
+   * The current replication mode. Always SYNC for now.
+   * <p>
+   * TODO: Implement mode transitions to STORE_AND_FORWARD when standby 
becomes unavailable.
+   * <p>
+   * TODO: Implement mode transitions to SYNC_AND_FORWARD when draining queue.
+   */
+  protected volatile ReplicationMode currentMode = ReplicationMode.SYNC;
+
+  // TODO: Add configuration keys for store-and-forward behavior
+  // - Maximum retry attempts before switching to store-and-forward
+  // - Retry delay between attempts
+  // - Queue drain batch size
+  // - Queue drain interval
+
+  // TODO: Add state tracking fields
+  // - Queue of pending changes when in store-and-forward mode
+  // - Timestamp of last successful standby write
+  // - Error count for tracking consecutive failures
+
+  // TODO: Add methods for state transitions
+  // - switchToStoreAndForward() - Called when standby becomes unavailable
+  // - switchToSync() - Called when standby becomes available again
+  // - drainQueue() - Background task to process queued changes
+
+  // TODO: Enhance error handling in LogEventHandler
+  // - Track consecutive failures
+  // - Switch to store-and-forward after max retries
+
+  // TODO: Implement queue management for store-and-forward mode
+  // - Implement queue persistence to handle RegionServer restarts
+  // - Implement queue draining when in SYNC_AND_FORWARD state
+  // - Implement automatic recovery from temporary network issues
+  // - Add configurable thresholds for switching to store-and-forward based on 
write latency
+  // - Add circuit breaker pattern to prevent overwhelming the standby cluster
+  // - Add queue size limits and backpressure mechanisms
+  // - Add queue metrics for monitoring (queue size, oldest entry age, etc.)
+
+  // TODO: Enhance metrics for replication health monitoring
+  // - Add metrics for replication lag between active and standby
+  // - Track time spent in each replication mode (SYNC, STORE_AND_FORWARD, 
SYNC_AND_FORWARD)
+  // - Monitor queue drain rate and backlog size
+  // - Track consecutive failures and mode transition events
+
+  /**
+   * Gets the singleton instance of the ReplicationLogManager using the lazy 
initializer pattern.
+   * Initializes the instance if it hasn't been created yet.
+   * @param conf       Configuration object.
+   * @param serverName The server name.
+   * @return The singleton ReplicationLogManager instance.
+   * @throws IOException If initialization fails.
+   */
+  public static ReplicationLog get(Configuration conf, ServerName serverName) 
throws IOException {
+    if (instance == null) {
+      synchronized (ReplicationLog.class) {
+        if (instance == null) {
+          // Complete initialization before assignment
+          ReplicationLog logManager = new ReplicationLog(conf, serverName);
+          logManager.init();
+          instance = logManager;
+        }
+      }
+    }
+    return instance;
+  }
+
+  protected ReplicationLog(Configuration conf, ServerName serverName) {
+    this.conf = conf;
+    this.serverName = serverName;
+    this.rotationTimeMs =
+      conf.getLong(REPLICATION_LOG_ROTATION_TIME_MS_KEY, 
DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS);
+    long rotationSize = conf.getLong(REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
+      DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
+    double rotationSizePercent = 
conf.getDouble(REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY,
+      DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE);
+    this.rotationSizeBytes = (long) (rotationSize * rotationSizePercent);
+    this.maxRotationRetries =
+      conf.getInt(REPLICATION_LOG_ROTATION_RETRIES_KEY, 
DEFAULT_REPLICATION_LOG_ROTATION_RETRIES);
+    this.numShards = conf.getInt(REPLICATION_NUM_SHARDS_KEY, 
DEFAULT_REPLICATION_NUM_SHARDS);
+    String compressionName = 
conf.get(REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY,
+      DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM);
+    Compression.Algorithm compression = Compression.Algorithm.NONE;
+    if 
(!DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM.equalsIgnoreCase(compressionName))
 {
+      try {
+        compression = 
Compression.getCompressionAlgorithmByName(compressionName);
+      } catch (IllegalArgumentException e) {
+        LOG.warn("Unknown compression type " + compressionName + ", using 
NONE", e);
+      }
+    }
+    this.compression = compression;
+    this.ringBufferSize =
+      conf.getInt(REPLICATION_LOG_RINGBUFFER_SIZE_KEY, 
DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE);
+    this.syncTimeoutMs =
+      conf.getLong(REPLICATION_LOG_SYNC_TIMEOUT_KEY, 
DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT);
+    this.metrics = createMetricsSource();
+  }
+
+  /** Creates a new metrics source for monitoring replication log operations. 
*/
+  protected MetricsReplicationLogSource createMetricsSource() {
+    return new MetricsReplicationLogSourceImpl();
+  }
+
+  /** Returns the metrics source for monitoring replication log operations. */
+  public MetricsReplicationLogSource getMetrics() {
+    return metrics;
+  }
+
+  @SuppressWarnings("unchecked")
+  public void init() throws IOException {
+    if (numShards > MAX_REPLICATION_NUM_SHARDS) {
+      throw new IllegalArgumentException(REPLICATION_NUM_SHARDS_KEY + " is " + 
numShards
+        + ", but the limit is " + MAX_REPLICATION_NUM_SHARDS);
+    }
+    initializeFileSystems();
+    // Start time based rotation.
+    lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
+    startRotationExecutor();
+    // Create the initial writer. Do this before we call 
LogEventHandler.init().
+    currentWriter = createNewWriter(standbyFs, standbyUrl);
+    // Initialize the Disruptor. We use ProducerType.MULTI because multiple 
handlers might
+    // call append concurrently. We use YieldingWaitStrategy for low latency. 
When the ring
+    // buffer is full (controlled by REPLICATION_WRITER_RINGBUFFER_SIZE_KEY), 
producers
+    // calling ringBuffer.next() will effectively block (by 
yielding/spinning), creating
+    // backpressure on the callers. This ensures appends don't proceed until 
there is space.
+    disruptor = new Disruptor<>(
+      LogEvent.EVENT_FACTORY, ringBufferSize, new ThreadFactoryBuilder()
+        
.setNameFormat("ReplicationLogEventHandler-%d").setDaemon(true).build(),
+      ProducerType.MULTI, new YieldingWaitStrategy());
+    LogEventHandler eventHandler = new LogEventHandler();
+    eventHandler.init();
+    disruptor.handleEventsWith(eventHandler);
+    LogExceptionHandler exceptionHandler = new LogExceptionHandler();
+    disruptor.setDefaultExceptionHandler(exceptionHandler);
+    ringBuffer = disruptor.start();
+    LOG.info("ReplicationLogWriter started with ring buffer size {}", 
ringBufferSize);
+  }
+
+  /**
+   * Append a mutation to the log. This method is non-blocking and returns 
quickly, unless the ring
+   * buffer is full. The actual write happens asynchronously. We expect 
multiple append() calls
+   * followed by a sync(). The appends will be batched by the Disruptor. 
Should the ring buffer
+   * become full, which is not expected under normal operation but could (and 
should) happen if the
+   * log file writer is unable to make progress, due to a HDFS level 
disruption. Should we enter
+   * that condition this method will block until the append can be inserted.
+   * <p>
+   * An internal error may trigger fail-stop behavior. Subsequent to 
fail-stop, this method will
+   * throw an IOException("Closed"). No further appends are allowed.
+   * @param tableName The name of the HBase table the mutation applies to.
+   * @param commitId  The commit identifier (e.g., SCN) associated with the 
mutation.
+   * @param mutation  The HBase Mutation (Put or Delete) to be logged.
+   * @throws IOException If the writer is closed or if the ring buffer is full.
+   */
+  public void append(String tableName, long commitId, Mutation mutation) 
throws IOException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Append: table={}, commitId={}, mutation={}", tableName, 
commitId, mutation);
+    }
+    if (isClosed) {
+      throw new IOException("Closed");
+    }
+    long startTime = System.nanoTime();
+    // ringBuffer.next() claims the next sequence number. Because we 
initialize the Disruptor
+    // with ProducerType.MULTI and the blocking YieldingWaitStrategy this call 
WILL BLOCK if
+    // the ring buffer is full, thus providing backpressure to the callers.
+    long sequence = ringBuffer.next();
+    try {
+      LogEvent event = ringBuffer.get(sequence);
+      event.setValues(EVENT_TYPE_DATA, new Record(tableName, commitId, 
mutation), null);
+      metrics.updateAppendTime(System.nanoTime() - startTime);
+    } finally {
+      // Update ring buffer events metric
+      ringBuffer.publish(sequence);
+    }
+  }
+
+  /**
+   * Ensures all previously appended records are durably persisted. This 
method blocks until the
+   * sync operation completes or fails, potentially after internal retries. 
All in flight appends
+   * are batched and provided to the underlying LogWriter, which will then be 
synced. If there is a
+   * problem syncing the LogWriter we will retry, up to the retry limit, 
rolling the writer for each
+   * retry.
+   * <p>
+   * An internal error may trigger fail-stop behavior. Subsequent to 
fail-stop, this method will
+   * throw an IOException("Closed"). No further syncs are allowed.
+   * <p>
+   * NOTE: When the ReplicationLogManager is capable of switching between 
synchronous and fallback
+   * (store-and-forward) writers, then this will be pretty bullet proof. Right 
now we will still try
+   * to roll the synchronous writer a few times before giving up.
+   * @throws IOException If the sync operation fails after retries, or if 
interrupted.
+   */
+  public void sync() throws IOException {
+    if (isClosed) {
+      throw new IOException("Closed");
+    }
+    syncInternal();
+  }
+
+  /**
+   * Internal implementation of sync that publishes a sync event to the ring 
buffer and waits for
+   * completion.
+   */
+  protected void syncInternal() throws IOException {
+    long startTime = System.nanoTime();
+    CompletableFuture<Void> syncFuture = new CompletableFuture<>();
+    long sequence = ringBuffer.next();
+    try {
+      LogEvent event = ringBuffer.get(sequence);
+      event.setValues(EVENT_TYPE_SYNC, null, syncFuture);
+    } finally {
+      ringBuffer.publish(sequence);
+    }
+    LOG.trace("Published EVENT_TYPE_SYNC at sequence {}", sequence);
+    try {
+      // Wait for the event handler to process up to and including this sync 
event
+      syncFuture.get(syncTimeoutMs, TimeUnit.MILLISECONDS);
+      metrics.updateSyncTime(System.nanoTime() - startTime);
+    } catch (InterruptedException e) {
+      // Almost certainly the regionserver is shutting down or aborting.
+      // TODO: Do we need to do more here?
+      Thread.currentThread().interrupt();
+      throw new InterruptedIOException("Interrupted while waiting for sync");
+    } catch (ExecutionException e) {
+      LOG.error("Sync operation failed", e.getCause());
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      } else {
+        throw new IOException("Sync operation failed", e.getCause());
+      }
+    } catch (TimeoutException e) {
+      String message = "Sync operation timed out";
+      LOG.error(message);
+      throw new IOException(message, e);
+    }
+  }
+
+  /** Initializes the standby and fallback filesystems and creates their log 
directories. */
+  protected void initializeFileSystems() throws IOException {
+    String standbyUrlString = conf.get(REPLICATION_STANDBY_HDFS_URL_KEY);
+    if (standbyUrlString == null) {
+      throw new IOException(REPLICATION_STANDBY_HDFS_URL_KEY + " is not 
configured");
+    }
+    // Only validate that the URI is well formed. We should not assume the 
scheme must be
+    // "hdfs" because perhaps the operator will substitute another FileSystem 
implementation
+    // for DistributedFileSystem.
+    try {
+      this.standbyUrl = new URI(standbyUrlString);
+    } catch (URISyntaxException e) {
+      throw new IOException(REPLICATION_STANDBY_HDFS_URL_KEY + " is not 
valid", e);
+    }
+    String fallbackUrlString = conf.get(REPLICATION_FALLBACK_HDFS_URL_KEY);
+    if (fallbackUrlString != null) {
+      // Only validate that the URI is well formed, as above.
+      try {
+        this.fallbackUrl = new URI(fallbackUrlString);
+      } catch (URISyntaxException e) {
+        throw new IOException(REPLICATION_FALLBACK_HDFS_URL_KEY + " is not 
valid", e);
+      }
+      this.fallbackFs = getFileSystem(fallbackUrl);
+      Path fallbackLogDir = new Path(fallbackUrl.getPath());
+      if (!fallbackFs.exists(fallbackLogDir)) {
+        LOG.info("Creating directory {}", fallbackUrlString);
+        if (!this.fallbackFs.mkdirs(fallbackLogDir)) {
+          throw new IOException("Failed to create directory: " + 
fallbackUrlString);
+        }
+      }
+    } else {
+      // We support a synchronous replication only option if store-and-forward 
configuration
+      // keys are missing. This is outside the scope of the design spec but 
potentially
+      // useful for testing and also allows an operator to prefer failover 
consistency and
+      // simplicity over availability, even if that is not recommended. Log it 
at WARN level
+      // to focus appropriate attention. (Should it be ERROR?)
+      LOG.warn("Fallback not configured ({}), store-and-forward DISABLED.",
+        REPLICATION_FALLBACK_HDFS_URL_KEY);
+      this.fallbackFs = null;
+    }
+    // Configuration is sorted, and possibly store-and-forward directories 
have been created,
+    // now create the standby side directories as needed.
+    this.standbyFs = getFileSystem(standbyUrl);
+    Path standbyLogDir = new Path(standbyUrl.getPath());
+    if (!standbyFs.exists(standbyLogDir)) {
+      LOG.info("Creating directory {}", standbyUrlString);
+      if (!standbyFs.mkdirs(standbyLogDir)) {
+        throw new IOException("Failed to create directory: " + 
standbyUrlString);
+      }
+    }
+  }
+
+  /** Gets a FileSystem instance for the given URI using the current 
configuration. */
+  protected FileSystem getFileSystem(URI uri) throws IOException {
+    return FileSystem.get(uri, conf);
+  }
+
+  /** Calculates the interval for checking log rotation based on the 
configured rotation time. */
+  protected long getRotationCheckInterval(long rotationTimeMs) {
+    long interval;
+    if (rotationTimeMs > 0) {
+      interval = rotationTimeMs / 4;
+    } else {
+      // If rotation time is not configured or invalid, use a sensible default 
like 10 seconds
+      interval = 10000L;
+    }
+    return interval;
+  }
+
+  /** Starts the background task for time-based log rotation. */
+  protected void startRotationExecutor() {
+    Preconditions.checkState(rotationExecutor == null, "Rotation executor 
already started");
+    rotationExecutor = Executors.newSingleThreadScheduledExecutor(
+      new 
ThreadFactoryBuilder().setNameFormat("ReplicationLogRotator-%d").setDaemon(true).build());
+    long interval = getRotationCheckInterval(rotationTimeMs);
+    rotationExecutor.scheduleWithFixedDelay(new LogRotationTask(), interval, 
interval,
+      TimeUnit.MILLISECONDS);
+  }
+
+  /** Stops the background task for time-based log rotation. */
+  protected void stopRotationExecutor() {
+    if (rotationExecutor != null) {
+      rotationExecutor.shutdownNow();
+      rotationExecutor = null;
+    }
+  }
+
+  /** Gets the current writer, rotating it if necessary based on size 
thresholds. */
+  protected LogFileWriter getWriter() throws IOException {
+    lock.lock();
+    try {
+      if (shouldRotate()) {
+        rotateLog(RotationReason.SIZE);
+      }
+      return currentWriter;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Checks if the current log file needs to be rotated based on time or size. 
Must be called under
+   * lock.
+   * @return true if rotation is needed, false otherwise.
+   * @throws IOException If an error occurs checking the file size.
+   */
+  protected boolean shouldRotate() throws IOException {
+    if (currentWriter == null) {
+      LOG.warn("Current writer is null, forcing rotation.");
+      return true;
+    }
+    // Check time threshold
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    long last = lastRotationTime.get();
+    if (now - last >= rotationTimeMs) {
+      LOG.debug("Rotating log file due to time threshold ({} ms elapsed, 
threshold {} ms)",
+        now - last, rotationTimeMs);
+      return true;
+    }
+
+    // Check size threshold (using actual file size for accuracy)
+    long currentSize = currentWriter.getLength();
+    if (currentSize >= rotationSizeBytes) {
+      LOG.debug("Rotating log file due to size threshold ({} bytes, threshold 
{} bytes)",
+        currentSize, rotationSizeBytes);
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Closes the current log writer and opens a new one, updating rotation 
metrics.
+   * <p>
+   * This method handles the rotation of log files, which can be triggered by:
+   * <ul>
+   * <li>Time threshold exceeded (TIME)</li>
+   * <li>Size threshold exceeded (SIZE)</li>
+   * <li>Error condition requiring rotation (ERROR)</li>
+   * </ul>
+   * <p>
+   * The method implements retry logic for handling rotation failures. If 
rotation fails, it retries
+   * up to maxRotationRetries times. If the number of failures exceeds 
maxRotationRetries, an
+   * exception is thrown. Otherwise, it logs a warning and continues with the 
current writer.
+   * <p>
+   * The method is thread-safe and uses a lock to ensure atomic rotation 
operations.
+   * @param reason The reason for requesting log rotation
+   * @return The new LogFileWriter instance if rotation succeeded, or the 
current writer if rotation
+   *         failed
+   * @throws IOException if rotation fails after exceeding maxRotationRetries
+   */
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"UL_UNRELEASED_LOCK",
+      justification = "False positive")
+  protected LogFileWriter rotateLog(RotationReason reason) throws IOException {
+    lock.lock();
+    try {
+      // Try to get the new writer first. If it fails we continue using the 
current writer.
+      // Increment the writer generation
+      LogFileWriter newWriter = createNewWriter(standbyFs, standbyUrl);
+      LOG.debug("Created new writer: {}", newWriter);
+      // Close the current writer
+      if (currentWriter != null) {
+        LOG.debug("Closing current writer: {}", currentWriter);
+        closeWriter(currentWriter);
+      }
+      currentWriter = newWriter;
+      lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
+      rotationFailures.set(0);
+      metrics.incrementRotationCount();
+      switch (reason) {
+        case TIME:
+          metrics.incrementTimeBasedRotationCount();
+          break;
+        case SIZE:
+          metrics.incrementSizeBasedRotationCount();
+          break;
+        case ERROR:
+          metrics.incrementErrorBasedRotationCount();
+          break;
+      }
+    } catch (IOException e) {
+      // If we fail to rotate the log, we increment the failure counter. If we 
have exceeded
+      // the maximum number of retries, we close the log and throw the 
exception. Otherwise
+      // we log a warning and continue.
+      metrics.incrementRotationFailureCount();
+      long numFailures = rotationFailures.getAndIncrement();
+      if (numFailures >= maxRotationRetries) {
+        LOG.warn("Failed to rotate log (attempt {}/{}), closing log", 
numFailures,
+          maxRotationRetries, e);
+        closeOnError();
+        throw e;
+      }
+      LOG.warn("Failed to rotate log (attempt {}/{}), retrying...", 
numFailures, maxRotationRetries,
+        e);
+    } finally {
+      lock.unlock();
+    }
+    return currentWriter;
+  }
+
+  /**
+   * Creates a new log file path in a sharded directory structure based on 
server name and
+   * timestamp.
+   */
+  protected Path makeWriterPath(FileSystem fs, URI url) throws IOException {
+    long timestamp = EnvironmentEdgeManager.currentTimeMillis();
+    // To have all logs for a given regionserver appear in the same shard, 
hash only the
+    // serverName. However we expect some regionservers will have 
significantly more load than
+    // others so we instead distribute the logs over all of the shards 
randomly for a more even
+    // overall distribution by also hashing the timestamp.
+    int shard = (serverName.hashCode() ^ Long.hashCode(timestamp)) % numShards;
+    Path shardPath = new Path(url.getPath(), String.format(SHARD_DIR_FORMAT, 
shard));
+    // Ensure the shard directory exists. We track which shard directories we 
have probed or
+    // created to avoid a round trip to the namenode for repeats.
+    IOException[] exception = new IOException[1];
+    shardMap.computeIfAbsent(shardPath, p -> {
+      try {
+        if (!fs.exists(p)) {
+          if (!fs.mkdirs(p)) {
+            throw new IOException("Could not create path: " + p);
+          }
+        }
+      } catch (IOException e) {
+        exception[0] = e;
+      }
+      return p;
+    });
+    // If we faced an exception in computeIfAbsent, throw it
+    if (exception[0] != null) {
+      throw exception[0];
+    }
+    Path filePath = new Path(shardPath, String.format(FILE_NAME_FORMAT, 
timestamp, serverName));
+    return filePath;
+  }
+
+  /** Creates and initializes a new LogFileWriter for the given filesystem and 
URL. */
+  protected LogFileWriter createNewWriter(FileSystem fs, URI url) throws 
IOException {
+    Path filePath = makeWriterPath(fs, url);
+    LogFileWriterContext writerContext = new 
LogFileWriterContext(conf).setFileSystem(fs)
+      .setFilePath(filePath).setCompression(compression);
+    LogFileWriter newWriter = new LogFileWriter();
+    try {
+      newWriter.init(writerContext);
+      newWriter.setGeneration(writerGeneration.incrementAndGet());
+    } catch (IOException e) {
+      LOG.error("Failed to initialize new LogFileWriter for path {}", 
filePath, e);
+      throw e;
+    }
+    return newWriter;
+  }
+
+  /** Closes the given writer, logging any errors that occur during close. */
+  protected void closeWriter(LogFileWriter writer) {
+    if (writer == null) {
+      return;
+    }
+    try {
+      writer.close();
+    } catch (IOException e) {
+      // For now, just log and continue
+      LOG.error("Error closing log writer: " + writer, e);
+    }
+  }
+
+  /**
+   * Force closes the log upon an unrecoverable internal error. This is a 
fail-stop behavior: once
+   * called, the log is marked as closed, the Disruptor is halted, and all 
subsequent append() and
+   * sync() calls will throw an IOException("Closed"). This ensures that no 
further operations are
+   * attempted on a log that has encountered a critical error.
+   */
+  protected void closeOnError() {
+    lock.lock();
+    try {
+      if (isClosed) {
+        return;
+      }
+      isClosed = true;
+    } finally {
+      lock.unlock();
+    }
+    // Stop the time based rotation check.
+    stopRotationExecutor();
+    // We expect a final sync will not work. Just close the inner writer.
+    closeWriter(currentWriter);
+    // Directly halt the disruptor. shutdown() would wait for events to drain. 
We are expecting
+    // that will not work.
+    disruptor.halt();
+  }
+
+  /** Closes the log. */
+  public void close() {
+    lock.lock();
+    try {
+      if (isClosed) {
+        return;
+      }
+      isClosed = true;
+    } finally {
+      lock.unlock();
+    }
+    // Stop the time based rotation check.
+    stopRotationExecutor();
+    // Sync before shutting down to flush all pending appends.
+    try {
+      syncInternal();
+      disruptor.shutdown(); // Wait for a clean shutdown.
+    } catch (IOException e) {
+      LOG.warn("Error during final sync on close", e);
+      disruptor.halt(); // Go directly to halt.
+    }
+    // We must for the disruptor before closing the current writer.
+    closeWriter(currentWriter);
+  }
+
+  /** Implements time based rotation independent of in-line checking. */
+  protected class LogRotationTask implements Runnable {
+    @Override
+    public void run() {
+      if (isClosed) {
+        return;
+      }
+      // Use tryLock with a timeout to avoid blocking indefinitely if another 
thread holds
+      // the lock for an unexpectedly long time (e.g., during a problematic 
rotation).
+      boolean acquired = false;
+      try {
+        // Wait a short time for the lock
+        acquired = lock.tryLock(1, TimeUnit.SECONDS);
+        if (acquired) {
+          // Check only the time condition here, size is handled by getWriter
+          long now = EnvironmentEdgeManager.currentTimeMillis();
+          long last = lastRotationTime.get();
+          if (!isClosed && now - last >= rotationTimeMs) {
+            LOG.debug("Time based rotation needed ({} ms elapsed, threshold {} 
ms).", now - last,
+              rotationTimeMs);
+            try {
+              rotateLog(RotationReason.TIME); // rotateLog updates 
lastRotationTime
+            } catch (IOException e) {
+              LOG.error("Failed to rotate log, currentWriter is {}", 
currentWriter, e);
+              // More robust error handling goes here once the 
store-and-forward
+              // fallback is implemented. For now we just log the error and 
continue.
+            }
+          }
+        } else {
+          LOG.warn("LogRotationTask could not acquire lock, skipping check 
this time.");
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt(); // Preserve interrupt status
+        LOG.warn("LogRotationTask interrupted while trying to acquire lock.");
+      } finally {
+        if (acquired) {
+          lock.unlock();
+        }
+      }
+    }
+  }
+
+  protected static class Record {
+    public String tableName;
+    public long commitId;
+    public Mutation mutation;
+
+    public Record(String tableName, long commitId, Mutation mutation) {
+      this.tableName = tableName;
+      this.commitId = commitId;
+      this.mutation = mutation;
+    }
+  }
+
+  /** Event structure for the Disruptor ring buffer containing data and sync 
operations. */
+  protected static class LogEvent {
+    protected static final EventFactory<LogEvent> EVENT_FACTORY = 
LogEvent::new;
+
+    protected int type;
+    protected Record record;
+    protected CompletableFuture<Void> syncFuture; // Used only for SYNC events
+    protected long timestampNs; // Timestamp when event was created
+
+    public void setValues(int type, Record record, CompletableFuture<Void> 
syncFuture) {
+      this.type = type;
+      this.record = record;
+      this.syncFuture = syncFuture;
+      this.timestampNs = System.nanoTime();
+    }
+  }
+
+  /**
+   * Handles events from the Disruptor, managing batching, writer rotation, 
and error handling.
+   */
+  protected class LogEventHandler implements EventHandler<LogEvent> {
+    protected final int maxRetries; // Configurable max retries for sync
+    protected final long retryDelayMs; // Configurable delay between retries
+    protected final List<Record> currentBatch = new ArrayList<>();
+    protected final List<CompletableFuture<Void>> pendingSyncFutures = new 
ArrayList<>();
+    protected LogFileWriter writer;
+    protected long generation;
+
+    protected LogEventHandler() {
+      this.maxRetries =
+        conf.getInt(REPLICATION_LOG_SYNC_RETRIES_KEY, 
DEFAULT_REPLICATION_LOG_SYNC_RETRIES);
+      this.retryDelayMs =
+        conf.getLong(REPLICATION_LOG_RETRY_DELAY_MS_KEY, 
DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
+    }
+
+    protected void init() throws IOException {
+      this.writer = getWriter();
+      this.generation = writer.getGeneration();
+    }
+
+    /**
+     * Processes all pending sync operations by syncing the current writer and 
completing their
+     * associated futures. This method is called when we are ready to process 
a set of consolidated
+     * sync requests and performs the following steps:
+     * <ol>
+     * <li>Syncs the current writer to ensure all data is durably written.</li>
+     * <li>Completes all pending sync futures successfully.</li>
+     * <li>Clears the list of pending sync futures.</li>
+     * <li>Clears the current batch of records since they have been 
successfully synced.</li>
+     * </ol>
+     * @param sequence The sequence number of the last processed event
+     * @throws IOException if the sync operation fails
+     */
+    protected void processPendingSyncs(long sequence) throws IOException {
+      if (pendingSyncFutures.isEmpty()) {
+        return;
+      }
+      writer.sync();
+      // Complete all pending sync futures
+      for (CompletableFuture<Void> future : pendingSyncFutures) {
+        future.complete(null);
+      }
+      pendingSyncFutures.clear();
+      // Sync completed, clear the list of in-flight appends.
+      currentBatch.clear();
+      LOG.trace("Sync operation completed successfully up to sequence {}", 
sequence);
+    }
+
+    /**
+     * Fails all pending sync operations with the given exception. This method 
is called when we
+     * encounter an unrecoverable error during the sync of the inner writer. 
It completes all
+     * pending sync futures that were consolidated exceptionally.
+     * <p>
+     * Note: This method does not clear the currentBatch list. The 
currentBatch must be preserved as
+     * it contains records that may need to be replayed if we successfully 
rotate to a new writer.
+     * @param sequence The sequence number of the last processed event
+     * @param e        The IOException that caused the failure
+     */
+    protected void failPendingSyncs(long sequence, IOException e) {
+      if (pendingSyncFutures.isEmpty()) {
+        return;
+      }
+      for (CompletableFuture<Void> future : pendingSyncFutures) {
+        future.completeExceptionally(e);
+      }
+      pendingSyncFutures.clear();
+      LOG.warn("Failed to process syncs at sequence {}", sequence, e);
+    }
+
+    /**
+     * Processes a single event from the Disruptor ring buffer. This method 
handles both data and
+     * sync events, with retry logic for handling IO failures.
+     * <p>
+     * For data events, it:
+     * <ol>
+     * <li>Checks if the writer has been rotated and replays any in-flight 
records.</li>
+     * <li>Appends the record to the current writer.</li>
+     * <li>Adds the record to the current batch for potential replay.</li>
+     * <li>Processes any pending syncs if this is the end of a batch.</li>
+     * </ol>
+     * <p>
+     * For sync events, it:
+     * <ol>
+     * <li>Adds the sync future to the pending list.</li>
+     * <li>Processes any pending syncs if this is the end of a batch.</li>
+     * </ol>
+     * If an IOException occurs, the method will attempt to rotate the writer 
and retry the
+     * operation up to the configured maximum number of retries. If all 
retries fail, it will fail
+     * all pending syncs and throw the exception.
+     * <p>
+     * The retry logic includes a configurable delay between attempts to 
prevent tight loops when
+     * there are persistent HDFS issues. This delay helps mitigate the risk of 
rapid cycling through
+     * writers when the underlying storage system is experiencing problems.
+     * @param event      The event to process
+     * @param sequence   The sequence number of the event
+     * @param endOfBatch Whether this is the last event in the current batch
+     * @throws Exception if the operation fails after all retries
+     */
+    @Override
+    public void onEvent(LogEvent event, long sequence, boolean endOfBatch) 
throws Exception {
+      // Calculate time spent in ring buffer
+      long currentTimeNs = System.nanoTime();
+      long ringBufferTimeNs = currentTimeNs - event.timestampNs;
+      metrics.updateRingBufferTime(ringBufferTimeNs);
+      writer = getWriter();
+      int attempt = 0;
+      while (attempt < maxRetries) {
+        try {
+          if (writer.getGeneration() > generation) {
+            generation = writer.getGeneration();
+            // If the writer has been rotated, we need to replay the current 
batch of
+            // in-flight appends into the new writer.
+            if (!currentBatch.isEmpty()) {
+              LOG.trace("Writer has been rotated, replaying in-flight batch");
+              for (Record r : currentBatch) {
+                writer.append(r.tableName, r.commitId, r.mutation);
+              }
+            }
+          }
+          switch (event.type) {
+            case EVENT_TYPE_DATA:
+              writer.append(event.record.tableName, event.record.commitId, 
event.record.mutation);
+              // Add to current batch only after we succeed at appending, so 
we don't
+              // replay it twice.
+              currentBatch.add(event.record);
+              // Process any pending syncs at the end of batch.
+              if (endOfBatch) {
+                processPendingSyncs(sequence);
+              }
+              return;
+            case EVENT_TYPE_SYNC:
+              // Add this sync future to the pending list.
+              pendingSyncFutures.add(event.syncFuture);
+              // Process any pending syncs at the end of batch.
+              if (endOfBatch) {
+                processPendingSyncs(sequence);
+              }
+              return;
+            default:
+              throw new UnsupportedOperationException("Unknown event type: " + 
event.type);
+          }
+        } catch (IOException e) {
+          // IO exception, force a rotation.
+          LOG.debug("Attempt " + (attempt + 1) + "/" + maxRetries + " failed", 
e);
+          if (attempt >= maxRetries) {
+            failPendingSyncs(sequence, e);
+            throw e;
+          }
+          attempt++;
+          // Add delay before retrying to prevent tight loops
+          try {
+            Thread.sleep(retryDelayMs);
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new InterruptedIOException("Interrupted during retry delay");
+          }
+          writer = rotateLog(RotationReason.ERROR);
+        }
+      }
+    }
+  }
+
+  /**
+   * Handler for critical errors during the Disruptor lifecycle that closes 
the writer to prevent
+   * data loss.
+   */
+  protected class LogExceptionHandler implements ExceptionHandler<LogEvent> {
+    @Override
+    public void handleEventException(Throwable e, long sequence, LogEvent 
event) {
+      String message = "Exception processing sequence " + sequence + "  for 
event " + event;
+      LOG.error(message, e);
+      closeOnError();
+    }
+
+    @Override
+    public void handleOnStartException(Throwable e) {
+      LOG.error("Exception during Disruptor startup", e);
+      closeOnError();
+    }
+
+    @Override
+    public void handleOnShutdownException(Throwable e) {
+      // Should not happen, but if it does, the regionserver is aborting or 
shutting down.
+      LOG.error("Exception during Disruptor shutdown", e);
+      closeOnError();
+    }
+  }
+
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
index 8560dc6ea3..0c883a4469 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
@@ -36,6 +36,13 @@ public class LogFileWriter implements LogFile.Writer {
   private LogFileWriterContext context;
   private LogFileFormatWriter writer;
   private boolean closed = false;
+  /**
+   * A monotonically increasing sequence number that identifies this writer 
instance, used to detect
+   * log file rotations and ensure proper handling of in-flight operations. 
Higher layers will get a
+   * new generation number that is higher than any previous generation and 
store it in the
+   * LogFileWriter via setGeneration().
+   */
+  private long generation;
 
   public LogFileWriter() {
 
@@ -45,6 +52,14 @@ public class LogFileWriter implements LogFile.Writer {
     return context;
   }
 
+  public void setGeneration(long generation) {
+    this.generation = generation;
+  }
+
+  public long getGeneration() {
+    return generation;
+  }
+
   @Override
   public void init(LogFileWriterContext context) throws IOException {
     this.context = context;
@@ -110,7 +125,7 @@ public class LogFileWriter implements LogFile.Writer {
   @Override
   public String toString() {
     return "LogFileWriter [writerContext=" + context + ", formatWriter=" + 
writer + ", closed="
-      + closed + "]";
+      + closed + ", generation=" + generation + "]";
   }
 
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
new file mode 100644
index 0000000000..907e3398fe
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+/** Interface for metrics related to ReplicationLog operations. */
+public interface MetricsReplicationLogSource extends BaseSource {
+
+  String METRICS_NAME = "ReplicationLog";
+  String METRICS_CONTEXT = "phoenix";
+  String METRICS_DESCRIPTION = "Metrics about Phoenix Replication Log 
Operations";
+  String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+  String TIME_BASED_ROTATION_COUNT = "timeBasedRotationCount";
+  String TIME_BASED_ROTATION_COUNT_DESC = "Number of time-based log rotations";
+
+  String SIZE_BASED_ROTATION_COUNT = "sizeBasedRotationCount";
+  String SIZE_BASED_ROTATION_COUNT_DESC = "Number of size-based log rotations";
+
+  String ERROR_BASED_ROTATION_COUNT = "errorBasedRotationCount";
+  String ERROR_BASED_ROTATION_COUNT_DESC = "Number of times rotateLog was 
called due to errors";
+
+  String ROTATION_COUNT = "rotationCount";
+  String ROTATION_COUNT_DESC = "Total number of times rotateLog was called";
+
+  String ROTATION_FAILURES = "rotationFailures";
+
+  String ROTATION_FAILURES_DESC = "Number of times log rotation has failed";
+  String APPEND_TIME = "appendTimeMs";
+  String APPEND_TIME_DESC = "Histogram of time taken for append operations in 
nanoseconds";
+
+  String SYNC_TIME = "syncTimeMs";
+  String SYNC_TIME_DESC = "Histogram of time taken for sync operations in 
nanoseconds";
+
+  String ROTATION_TIME = "rotationTimeMs";
+  String ROTATION_TIME_DESC = "Histogram of time taken for log rotations in 
nanoseconds";
+
+  String RING_BUFFER_TIME = "ringBufferTime";
+  String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer";
+
+  /**
+   * Increments the counter for time-based log rotations. This counter tracks 
the number of times
+   * the log was rotated due to time threshold.
+   */
+  void incrementTimeBasedRotationCount();
+
+  /**
+   * Increments the counter for size-based log rotations. This counter tracks 
the number of times
+   * the log was rotated due to size threshold.
+   */
+  void incrementSizeBasedRotationCount();
+
+  /**
+   * Increments the counter for error-based log rotations. This counter tracks 
the number of times
+   * the log was rotated due to errors.
+   */
+  void incrementErrorBasedRotationCount();
+
+  /**
+   * Increments the counter for total log rotations. This counter tracks the 
total number of times
+   * the log was rotated, regardless of reason.
+   */
+  void incrementRotationCount();
+
+  /**
+   * Update the time taken for an append operation in nanoseconds.
+   * @param timeNs Time taken in nanoseconds
+   */
+  void updateAppendTime(long timeNs);
+
+  /**
+   * Update the time taken for a sync operation in nanoseconds.
+   * @param timeNs Time taken in nanoseconds
+   */
+  void updateSyncTime(long timeNs);
+
+  /**
+   * Update the time taken for a rotation operation in nanoseconds.
+   * @param timeNs Time taken in nanoseconds
+   */
+  void updateRotationTime(long timeNs);
+
+  /**
+   * Update the time an event spent in the ring buffer in nanoseconds.
+   * @param timeNs Time spent in nanoseconds
+   */
+  void updateRingBufferTime(long timeNs);
+
+  /**
+   * Increments the counter for log rotation failures. This counter tracks the 
number of times log
+   * rotation has failed.
+   */
+  void incrementRotationFailureCount();
+
+  // Get current values for testing
+  ReplicationLogMetricValues getCurrentMetricValues();
+
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
new file mode 100644
index 0000000000..fcb08efde9
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableHistogram;
+
+/** Implementation of metrics source for ReplicationLog operations. */
+public class MetricsReplicationLogSourceImpl extends BaseSourceImpl
+  implements MetricsReplicationLogSource {
+
+  private final MutableFastCounter timeBasedRotationCount;
+  private final MutableFastCounter sizeBasedRotationCount;
+  private final MutableFastCounter errorBasedRotationCount;
+  private final MutableFastCounter rotationCount;
+  private final MutableFastCounter rotationFailuresCount;
+  private final MutableHistogram appendTime;
+  private final MutableHistogram syncTime;
+  private final MutableHistogram rotationTime;
+  private final MutableHistogram ringBufferTime;
+
+  public MetricsReplicationLogSourceImpl() {
+    this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, 
METRICS_JMX_CONTEXT);
+  }
+
+  public MetricsReplicationLogSourceImpl(String metricsName, String 
metricsDescription,
+    String metricsContext, String metricsJmxContext) {
+    super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+    timeBasedRotationCount = 
getMetricsRegistry().newCounter(TIME_BASED_ROTATION_COUNT,
+      TIME_BASED_ROTATION_COUNT_DESC, 0L);
+    sizeBasedRotationCount = 
getMetricsRegistry().newCounter(SIZE_BASED_ROTATION_COUNT,
+      SIZE_BASED_ROTATION_COUNT_DESC, 0L);
+    errorBasedRotationCount = 
getMetricsRegistry().newCounter(ERROR_BASED_ROTATION_COUNT,
+      ERROR_BASED_ROTATION_COUNT_DESC, 0L);
+    rotationCount = getMetricsRegistry().newCounter(ROTATION_COUNT, 
ROTATION_COUNT_DESC, 0L);
+    rotationFailuresCount =
+      getMetricsRegistry().newCounter(ROTATION_FAILURES, 
ROTATION_FAILURES_DESC, 0L);
+    appendTime = getMetricsRegistry().newHistogram(APPEND_TIME, 
APPEND_TIME_DESC);
+    syncTime = getMetricsRegistry().newHistogram(SYNC_TIME, SYNC_TIME_DESC);
+    rotationTime = getMetricsRegistry().newHistogram(ROTATION_TIME, 
ROTATION_TIME_DESC);
+    ringBufferTime = getMetricsRegistry().newHistogram(RING_BUFFER_TIME, 
RING_BUFFER_TIME_DESC);
+  }
+
+  @Override
+  public void incrementTimeBasedRotationCount() {
+    timeBasedRotationCount.incr();
+  }
+
+  @Override
+  public void incrementSizeBasedRotationCount() {
+    sizeBasedRotationCount.incr();
+  }
+
+  @Override
+  public void incrementErrorBasedRotationCount() {
+    errorBasedRotationCount.incr();
+  }
+
+  @Override
+  public void incrementRotationCount() {
+    rotationCount.incr();
+  }
+
+  @Override
+  public void incrementRotationFailureCount() {
+    rotationFailuresCount.incr();
+  }
+
+  @Override
+  public void updateAppendTime(long timeNs) {
+    appendTime.add(timeNs);
+  }
+
+  @Override
+  public void updateSyncTime(long timeNs) {
+    syncTime.add(timeNs);
+  }
+
+  @Override
+  public void updateRotationTime(long timeNs) {
+    rotationTime.add(timeNs);
+  }
+
+  @Override
+  public void updateRingBufferTime(long timeNs) {
+    ringBufferTime.add(timeNs);
+  }
+
+  @Override
+  public ReplicationLogMetricValues getCurrentMetricValues() {
+    return new ReplicationLogMetricValues(timeBasedRotationCount.value(),
+      sizeBasedRotationCount.value(), errorBasedRotationCount.value(), 
rotationCount.value(),
+      rotationFailuresCount.value(), appendTime.getMax(), syncTime.getMax(), 
rotationTime.getMax(),
+      ringBufferTime.getMax());
+  }
+
+  @Override
+  public String getMetricsName() {
+    return METRICS_NAME;
+  }
+
+  @Override
+  public String getMetricsDescription() {
+    return METRICS_DESCRIPTION;
+  }
+
+  @Override
+  public String getMetricsContext() {
+    return METRICS_CONTEXT;
+  }
+
+  @Override
+  public String getMetricsJmxContext() {
+    return METRICS_JMX_CONTEXT;
+  }
+
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
new file mode 100644
index 0000000000..23ee864747
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.metrics;
+
+/** Class to hold the values of all metrics tracked by the ReplicationLog 
metrics source. */
+public class ReplicationLogMetricValues {
+
+  private final long timeBasedRotationCount;
+  private final long sizeBasedRotationCount;
+  private final long errorBasedRotationCount;
+  private final long rotationCount;
+  private final long rotationFailuresCount;
+  private final long appendTime;
+  private final long syncTime;
+  private final long rotationTime;
+  private final long ringBufferTime;
+
+  public ReplicationLogMetricValues(long timeBasedRotationCount, long 
sizeBasedRotationCount,
+    long errorBasedRotationCount, long rotationCount, long 
rotationFailuresCount, long appendTime,
+    long syncTime, long rotationTime, long ringBufferTime) {
+    this.timeBasedRotationCount = timeBasedRotationCount;
+    this.sizeBasedRotationCount = sizeBasedRotationCount;
+    this.errorBasedRotationCount = errorBasedRotationCount;
+    this.rotationCount = rotationCount;
+    this.rotationFailuresCount = rotationFailuresCount;
+    this.appendTime = appendTime;
+    this.syncTime = syncTime;
+    this.rotationTime = rotationTime;
+    this.ringBufferTime = ringBufferTime;
+  }
+
+  public long getTimeBasedRotationCount() {
+    return timeBasedRotationCount;
+  }
+
+  public long getSizeBasedRotationCount() {
+    return sizeBasedRotationCount;
+  }
+
+  public long getErrorBasedRotationCount() {
+    return errorBasedRotationCount;
+  }
+
+  public long getRotationCount() {
+    return rotationCount;
+  }
+
+  public long getRotationFailuresCount() {
+    return rotationFailuresCount;
+  }
+
+  public long getAppendTime() {
+    return appendTime;
+  }
+
+  public long getSyncTime() {
+    return syncTime;
+  }
+
+  public long getRotationTime() {
+    return rotationTime;
+  }
+
+  public long getRingBufferTime() {
+    return ringBufferTime;
+  }
+
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/package-info.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/package-info.java
new file mode 100644
index 0000000000..209787baef
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides implementations for tracking and reporting various 
metrics associated
+ * with the replication process.
+ */
+package org.apache.phoenix.replication.metrics;
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/package-info.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/package-info.java
new file mode 100644
index 0000000000..3fba1505e0
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes and utilities for handling replication, 
including replication log
+ * management and system catalog WAL entry filtering.
+ */
+package org.apache.phoenix.replication;
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
new file mode 100644
index 0000000000..d5b1bb43d8
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.tool;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.replication.log.LogFile.Record;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Command-line tool for analyzing Phoenix Replication Log files. This tool 
can: - Read a single log
+ * file or directory of log files - Print file headers, trailers, and block 
headers - Decode and
+ * display log record contents - Verify checksums and report corruption
+ */
+public class LogFileAnalyzer extends Configured implements Tool {
+  private static final Logger LOG = 
LoggerFactory.getLogger(LogFileAnalyzer.class);
+
+  private static final String USAGE = "Usage: LogFileAnalyzer [options] 
<path>\n" + "Options:\n"
+    + "  -h, --help        Show this help message\n"
+    + "  -v, --verbose     Show detailed information\n"
+    + "  -c, --check       Verify checksums and report corruption\n"
+    + "  -d, --decode      Decode and display record contents\n";
+
+  private boolean verbose = false;
+  private boolean decode = false;
+  private boolean check = false;
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (!parseArgs(args)) {
+      System.err.println(USAGE);
+      return 1;
+    }
+
+    Configuration conf = getConf();
+    if (conf == null) {
+      conf = HBaseConfiguration.create();
+      setConf(conf);
+    }
+
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      Path path = new Path(args[args.length - 1]);
+
+      if (!fs.exists(path)) {
+        System.err.println("Path does not exist: " + path);
+        return 1;
+      }
+
+      List<Path> filesToAnalyze = new ArrayList<>();
+      if (fs.getFileStatus(path).isDirectory()) {
+        // Recursively find all .plog files
+        findLogFiles(fs, path, filesToAnalyze);
+      } else {
+        filesToAnalyze.add(path);
+      }
+
+      if (filesToAnalyze.isEmpty()) {
+        System.err.println("No log files found in: " + path);
+        return 1;
+      }
+
+      // Analyze each file
+      for (Path file : filesToAnalyze) {
+        analyzeFile(fs, file);
+      }
+
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Error analyzing log files", e);
+      return 1;
+    }
+  }
+
+  private void findLogFiles(FileSystem fs, Path dir, List<Path> files) throws 
IOException {
+    FileStatus[] statuses = fs.listStatus(dir);
+    for (FileStatus status : statuses) {
+      Path path = status.getPath();
+      if (status.isDirectory()) {
+        findLogFiles(fs, path, files);
+      } else if (path.getName().endsWith(".plog")) {
+        files.add(path);
+      }
+    }
+  }
+
+  private void analyzeFile(FileSystem fs, Path file) throws IOException {
+    System.out.println("\nAnalyzing file: " + file);
+
+    LogFileReaderContext context = new 
LogFileReaderContext(getConf()).setFileSystem(fs)
+      .setFilePath(file).setSkipCorruptBlocks(check); // Skip corrupt blocks 
if checking
+
+    LogFileReader reader = new LogFileReader();
+    try {
+      reader.init(context);
+
+      // Print header information
+      System.out.println("Header:");
+      System.out.println("  Version: " + reader.getHeader().getMajorVersion() 
+ "."
+        + reader.getHeader().getMinorVersion());
+
+      // Process records
+      int recordCount = 0;
+      Record record;
+      while ((record = reader.next()) != null) {
+        recordCount++;
+        if (decode) {
+          System.out.println("\nRecord #" + recordCount + ":");
+          System.out.println("  Table: " + record.getHBaseTableName());
+          System.out.println("  Commit ID: " + record.getCommitId());
+          System.out.println("  Mutation: " + record.getMutation());
+          if (verbose) {
+            System.out.println("  Serialized Length: " + 
record.getSerializedLength());
+          }
+        }
+      }
+
+      // Print trailer information
+      System.out.println("\nTrailer:");
+      System.out.println("  Record Count: " + 
reader.getTrailer().getRecordCount());
+      System.out.println("  Block Count: " + 
reader.getTrailer().getBlockCount());
+      System.out.println("  Blocks Start Offset: " + 
reader.getTrailer().getBlocksStartOffset());
+      System.out.println("  Trailer Start Offset: " + 
reader.getTrailer().getTrailerStartOffset());
+
+      // Print verification results if checking
+      if (check) {
+        System.out.println("\nVerification Results:");
+        System.out.println("  Records Read: " + context.getRecordsRead());
+        System.out.println("  Blocks Read: " + context.getBlocksRead());
+        System.out.println("  Corrupt Blocks Skipped: " + 
context.getCorruptBlocksSkipped());
+
+        if (context.getCorruptBlocksSkipped() > 0) {
+          System.out.println("  WARNING: File contains corrupt blocks!");
+        } else if (context.getRecordsRead() == 
reader.getTrailer().getRecordCount()) {
+          System.out.println("  File integrity verified successfully");
+        } else {
+          System.out.println("  WARNING: Record count mismatch!");
+        }
+      }
+    } finally {
+      reader.close();
+    }
+  }
+
+  private boolean parseArgs(String[] args) {
+    if (args.length == 0) {
+      return false;
+    }
+    for (int i = 0; i < args.length - 1; i++) {
+      String arg = args[i];
+      switch (arg) {
+        case "-h":
+        case "--help":
+          return false;
+        case "-v":
+        case "--verbose":
+          verbose = true;
+          break;
+        case "-c":
+        case "--check":
+          check = true;
+          break;
+        case "-d":
+        case "--decode":
+          decode = true;
+          break;
+        default:
+          if (arg.startsWith("-")) {
+            System.err.println("Unknown option: " + arg);
+            return false;
+          }
+      }
+    }
+    return true;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new LogFileAnalyzer(), args);
+    System.exit(res);
+  }
+
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/README.md
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/README.md
new file mode 100644
index 0000000000..c6afc361d9
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/README.md
@@ -0,0 +1,70 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Phoenix Replication Log File Analyzer
+
+A command-line tool for analyzing Phoenix Replication Log files.
+
+This tool can read a log file or a directory of log files, print the header,
+trailer (if present), and block headers, optionally decode and print the
+LogRecord contents in a human-readable format, verify checksums for each block,
+and report any corruption or format violations.
+
+## Usage
+
+```bash
+hadoop jar phoenix-server.jar \
+    org.apache.phoenix.replication.tool.LogFileAnalyzer [options] \
+    <log-file-or-directory>
+```
+
+### Options
+
+- `-h, --help`: Print help message
+- `-v, --verbose`: Print verbose output including block headers
+- `-c, --check`: Verify checksums and report any corruption
+- `-d, --decode`: Decode and print record contents in human-readable format
+
+### Examples
+
+#### Basic analysis of a log file:
+
+```bash
+hadoop jar phoenix-server.jar \
+    org.apache.phoenix.replication.tool.LogFileAnalyzer /path/to/log.plog
+```
+
+#### Analyze with record decoding:
+
+```bash
+hadoop jar phoenix-server.jar \
+    org.apache.phoenix.replication.tool.LogFileAnalyzer -d /path/to/log.plog
+```
+
+#### Verify checksums and report corruption:
+
+```bash
+hadoop jar phoenix-server.jar \
+    org.apache.phoenix.replication.tool.LogFileAnalyzer -c /path/to/log.plog
+```
+
+#### Analyze all log files in a directory with verbose output:
+
+```bash
+hadoop jar phoenix-server.jar \
+    org.apache.phoenix.replication.tool.LogFileAnalyzer -v /path/to/logs/
+```
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/package-info.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/package-info.java
new file mode 100644
index 0000000000..5c512daa76
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package containing tools for Phoenix replication functionality.
+ */
+package org.apache.phoenix.replication.tool;
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
new file mode 100644
index 0000000000..ced1527400
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
@@ -0,0 +1,1344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.phoenix.replication.ReplicationLog.RotationReason;
+import org.apache.phoenix.replication.log.LogFile;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.replication.log.LogFileTestUtil;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogSource;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogSourceImpl;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogTest {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogTest.class);
+
+  @ClassRule
+  public static TemporaryFolder testFolder = new TemporaryFolder();
+
+  private Configuration conf;
+  private ServerName serverName;
+  private FileSystem localFs;
+  private URI standbyUri;
+  private ReplicationLog logWriter;
+
+  static final int TEST_RINGBUFFER_SIZE = 32;
+  static final int TEST_SYNC_TIMEOUT = 1000;
+  static final int TEST_ROTATION_TIME = 5000;
+  static final int TEST_ROTATION_SIZE_BYTES = 10 * 1024;
+
+  @Before
+  public void setUp() throws IOException {
+    conf = HBaseConfiguration.create();
+    localFs = FileSystem.getLocal(conf);
+    standbyUri = new Path(testFolder.toString()).toUri();
+    serverName = ServerName.valueOf("test", 60010, 
EnvironmentEdgeManager.currentTimeMillis());
+    conf.set(ReplicationLog.REPLICATION_STANDBY_HDFS_URL_KEY, 
standbyUri.toString());
+    // Small ring buffer size for testing
+    conf.setInt(ReplicationLog.REPLICATION_LOG_RINGBUFFER_SIZE_KEY, 
TEST_RINGBUFFER_SIZE);
+    // Set a short sync timeout for testing
+    conf.setLong(ReplicationLog.REPLICATION_LOG_SYNC_TIMEOUT_KEY, 
TEST_SYNC_TIMEOUT);
+    // Set rotation time to 10 seconds
+    conf.setLong(ReplicationLog.REPLICATION_LOG_ROTATION_TIME_MS_KEY, 
TEST_ROTATION_TIME);
+    // Small size threshold for testing
+    conf.setLong(ReplicationLog.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY, 
TEST_ROTATION_SIZE_BYTES);
+
+    logWriter = spy(new TestableReplicationLog(conf, serverName));
+    logWriter.init();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (logWriter != null) {
+      logWriter.close();
+    }
+    // Deregister the metrics source that the replication log registers during 
initialization
+    // so the next unit will be able to register it again and successfully 
initialize.
+    DefaultMetricsSystem.instance()
+      .unregisterSource(MetricsReplicationLogSource.METRICS_JMX_CONTEXT);
+  }
+
+  /**
+   * Tests basic append and sync functionality of the replication log. 
Verifies that mutations are
+   * correctly appended to the log and that sync operations properly commit 
the changes to disk.
+   */
+  @Test
+  public void testAppendAndSync() throws Exception {
+    final String tableName = "TESTTBL";
+    final long commitId1 = 1L;
+    final long commitId2 = 2L;
+    final long commitId3 = 3L;
+    final long commitId4 = 4L;
+    final long commitId5 = 5L;
+    final Mutation put1 = LogFileTestUtil.newPut("row1", 1, 1);
+    final Mutation put2 = LogFileTestUtil.newPut("row2", 2, 1);
+    final Mutation put3 = LogFileTestUtil.newPut("row3", 3, 1);
+    final Mutation put4 = LogFileTestUtil.newPut("row4", 4, 1);
+    final Mutation put5 = LogFileTestUtil.newPut("row5", 5, 1);
+
+    // Get the inner writer
+    LogFileWriter writer = logWriter.getWriter();
+    assertNotNull("Writer should not be null", writer);
+    InOrder inOrder = Mockito.inOrder(writer);
+
+    logWriter.append(tableName, commitId1, put1);
+    logWriter.append(tableName, commitId2, put2);
+    logWriter.append(tableName, commitId3, put3);
+    logWriter.append(tableName, commitId4, put4);
+    logWriter.append(tableName, commitId5, put5);
+
+    logWriter.sync();
+
+    // Happens-before ordering verification, using Mockito's inOrder. Verify 
that the appends
+    // happen before sync, and sync happened after appends.
+    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId1), 
eq(put1));
+    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId2), 
eq(put2));
+    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId3), 
eq(put3));
+    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId4), 
eq(put4));
+    inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId5), 
eq(put5));
+    inOrder.verify(writer, times(1)).sync();
+  }
+
+  /**
+   * Tests the behavior when an append operation fails. Verifies that the 
system properly handles
+   * append failures by rolling to a new writer and retrying the operation.
+   */
+  @Test
+  public void testAppendFailureAndRetry() throws Exception {
+    final String tableName = "TBLAFR";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Get the inner writer
+    LogFileWriter writerBeforeRoll = logWriter.getWriter();
+    assertNotNull("Initial writer should not be null", writerBeforeRoll);
+
+    // Configure writerBeforeRoll to fail on the first append call
+    doThrow(new IOException("Simulated append 
failure")).when(writerBeforeRoll).append(anyString(),
+      anyLong(), any(Mutation.class));
+
+    // Append data
+    logWriter.append(tableName, commitId, put);
+    logWriter.sync();
+
+    // Get the inner writer we rolled to.
+    LogFileWriter writerAfterRoll = logWriter.getWriter();
+    assertNotNull("Rolled writer should not be null", writerAfterRoll);
+
+    // Verify the sequence: append (fail), rotate, append (succeed), sync
+    InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
+    inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put));
+    inOrder.verify(writerBeforeRoll, times(0)).sync(); // We failed append, 
did not try
+    inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put)); // Retry
+    inOrder.verify(writerAfterRoll, times(1)).sync();
+  }
+
+  /**
+   * Tests the behavior when a sync operation fails. Verifies that the system 
properly handles sync
+   * failures by rolling to a new writer and retrying the operation.
+   */
+  @Test
+  public void testSyncFailureAndRetry() throws Exception {
+    final String tableName = "TBLSFR";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Get the inner writer
+    LogFileWriter writerBeforeRoll = logWriter.getWriter();
+    assertNotNull("Initial writer should not be null", writerBeforeRoll);
+
+    // Configure writerBeforeRoll to fail on the first sync call
+    doThrow(new IOException("Simulated sync 
failure")).when(writerBeforeRoll).sync();
+
+    // Append data
+    logWriter.append(tableName, commitId, put);
+    logWriter.sync();
+
+    // Get the inner writer we rolled to.
+    LogFileWriter writerAfterRoll = logWriter.getWriter();
+    assertNotNull("Initial writer should not be null", writerBeforeRoll);
+
+    // Verify the sequence: append, sync (fail), rotate, append (retry), sync 
(succeed)
+    InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
+    inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put));
+    inOrder.verify(writerBeforeRoll, times(1)).sync(); // Failed
+    inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName), 
eq(commitId), eq(put)); // Replay
+    inOrder.verify(writerAfterRoll, times(1)).sync(); // Succeeded
+  }
+
+  /**
+   * Tests the blocking behavior when the ring buffer is full. Verifies that 
append operations block
+   * when the ring buffer is full and resume as soon as space becomes 
available again.
+   */
+  @Test
+  public void testBlockingWhenRingFull() throws Exception {
+    final String tableName = "TBLBWRF";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    long commitId = 0;
+
+    // Get the inner writer
+    LogFileWriter innerWriter = logWriter.getWriter();
+    assertNotNull("Inner writer should not be null", innerWriter);
+
+    // Create a slow consumer to fill up the ring buffer.
+    doAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        Thread.sleep(50); // Simulate slow processing
+        return invocation.callRealMethod();
+      }
+    }).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class));
+
+    // Fill up the ring buffer by sending enough events.
+    for (int i = 0; i < TEST_RINGBUFFER_SIZE; i++) {
+      logWriter.append(tableName, commitId++, put);
+    }
+
+    // Now try to append when the ring is full. This should block until space 
becomes
+    // available.
+    long myCommitId = commitId++;
+    CompletableFuture<Void> startFuture = new CompletableFuture<>();
+    CompletableFuture<Void> appendFuture = new CompletableFuture<>();
+    Thread appendThread = new Thread(() -> {
+      try {
+        startFuture.complete(null);
+        logWriter.append(tableName, myCommitId, put);
+        appendFuture.complete(null);
+      } catch (IOException e) {
+        appendFuture.completeExceptionally(e);
+      }
+    });
+    appendThread.start();
+
+    // Wait for the append thread.
+    startFuture.get();
+
+    // Verify the append is still blocked
+    assertFalse("Append should be blocked when ring is full", 
appendFuture.isDone());
+
+    // Let some events process to free up space.
+    Thread.sleep(100);
+
+    // Now the append should complete. Any issues and we will time out here.
+    appendFuture.get();
+    assertTrue("Append should have completed", appendFuture.isDone());
+
+    // Verify the append eventually happens on the writer.
+    verify(innerWriter, timeout(10000).times(1)).append(eq(tableName), 
eq(myCommitId), any());
+  }
+
+  /**
+   * Tests the sync timeout behavior. Verifies that sync operations time out 
after the configured
+   * interval if they cannot complete.
+   */
+  @Test
+  public void testSyncTimeout() throws Exception {
+    final String tableName = "TBLST";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Get the inner writer
+    LogFileWriter innerWriter = logWriter.getWriter();
+    assertNotNull("Inner writer should not be null", innerWriter);
+
+    doAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        // Pause long enough to cause a timeout.
+        Thread.sleep((long) (TEST_SYNC_TIMEOUT * 1.25));
+        return invocation.callRealMethod();
+      }
+    }).when(innerWriter).sync();
+
+    // Append some data
+    logWriter.append(tableName, commitId, put);
+
+    // Try to sync and expect it to timeout
+    try {
+      logWriter.sync();
+      fail("Expected sync to timeout");
+    } catch (IOException e) {
+      assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
+    }
+  }
+
+  /**
+   * Tests concurrent append operations from multiple producers. Verifies that 
the system correctly
+   * handles concurrent appends from multiple threads and maintains data 
consistency.
+   */
+  @Test
+  public void testConcurrentProducers() throws Exception {
+    final String tableName = "TBLCP";
+    final int APPENDS_PER_THREAD = 1000;
+    // Create a latch to coordinate thread starts
+    final CountDownLatch startLatch = new CountDownLatch(1);
+    // Create a latch to track completion of all appends
+    final CountDownLatch completionLatch = new CountDownLatch(2);
+
+    // Get the inner writer
+    LogFileWriter innerWriter = logWriter.getWriter();
+    assertNotNull("Inner writer should not be null", innerWriter);
+
+    // Thread 1: Append mutations with even commit IDs
+    Thread producerEven = new Thread(() -> {
+      try {
+        startLatch.await(); // Wait for start signal
+        for (int i = 0; i < APPENDS_PER_THREAD; i++) {
+          final long commitId = i * 2;
+          final Mutation put = LogFileTestUtil.newPut("row" + commitId, 
commitId, 1);
+          logWriter.append(tableName, commitId, put);
+        }
+      } catch (Exception e) {
+        fail("Producer 1 failed: " + e.getMessage());
+      } finally {
+        completionLatch.countDown();
+      }
+    });
+
+    // Thread 2: Append mutations with odd commit IDs
+    Thread producerOdd = new Thread(() -> {
+      try {
+        startLatch.await(); // Wait for start signal
+        for (int i = 0; i < APPENDS_PER_THREAD; i++) {
+          final long commitId = i * 2 + 1;
+          final Mutation put = LogFileTestUtil.newPut("row" + commitId, 
commitId, 1);
+          logWriter.append(tableName, commitId, put);
+        }
+      } catch (Exception e) {
+        fail("Producer 2 failed: " + e.getMessage());
+      } finally {
+        completionLatch.countDown();
+      }
+    });
+
+    // Start both threads.
+    producerEven.start();
+    producerOdd.start();
+    // Signal threads to start.
+    startLatch.countDown();
+    // Wait for all appends to complete
+    completionLatch.await();
+
+    // Perform a sync to ensure all appends are processed.
+    InOrder inOrder = Mockito.inOrder(innerWriter); // To verify the below 
sync.
+    logWriter.sync();
+    // Verify the final sync was called.
+    inOrder.verify(innerWriter, times(1)).sync();
+
+    // Verify that all of appends were processed by the internal writer.
+    for (int i = 0; i < APPENDS_PER_THREAD * 2; i++) {
+      final long commitId = i;
+      verify(innerWriter, times(1)).append(eq(tableName), eq(commitId), any());
+    }
+
+  }
+
+  /**
+   * Tests time-based log rotation. Verifies that the log file is rotated 
after the configured
+   * rotation time period and that operations continue correctly with the new 
log file.
+   */
+  @Test
+  public void testTimeBasedRotation() throws Exception {
+    final String tableName = "TBLTBR";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    final long commitId = 1L;
+
+    // Get the initial writer
+    LogFileWriter writerBeforeRotation = logWriter.getWriter();
+    assertNotNull("Initial writer should not be null", writerBeforeRotation);
+
+    // Append some data
+    logWriter.append(tableName, commitId, put);
+    logWriter.sync();
+
+    // Wait for rotation time to elapse
+    Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
+
+    // Append more data to trigger rotation check
+    logWriter.append(tableName, commitId + 1, put);
+    logWriter.sync();
+
+    // Get the new writer after rotation
+    LogFileWriter writerAfterRotation = logWriter.getWriter();
+    assertNotNull("New writer should not be null", writerAfterRotation);
+    assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
+
+    // Verify the sequence of operations
+    InOrder inOrder = Mockito.inOrder(writerBeforeRotation, 
writerAfterRotation);
+    inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName), 
eq(commitId), eq(put)); // First
+                                                                               
                  // append
+                                                                               
                  // to
+                                                                               
                  // initial
+                                                                               
                  // writer
+    inOrder.verify(writerBeforeRotation, times(1)).sync();
+    inOrder.verify(writerAfterRotation, times(0)).append(eq(tableName), 
eq(commitId), eq(put)); // First
+                                                                               
                 // append
+                                                                               
                 // is
+                                                                               
                 // not
+                                                                               
                 // replayed
+    inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId + 1), eq(put)); // Second
+                                                                               
                     // append
+                                                                               
                     // to
+                                                                               
                     // new
+                                                                               
                     // writer
+    inOrder.verify(writerAfterRotation, times(1)).sync();
+  }
+
+  /**
+   * Tests size-based log rotation. Verifies that the log file is rotated when 
it exceeds the
+   * configured size threshold and that operations continue correctly with the 
new log file.
+   */
+  @Test
+  public void testSizeBasedRotation() throws Exception {
+    final String tableName = "TBLSBR";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 10);
+    long commitId = 1L;
+
+    LogFileWriter writerBeforeRotation = logWriter.getWriter();
+    assertNotNull("Initial writer should not be null", writerBeforeRotation);
+
+    // Append enough data so that we exceed the size threshold.
+    for (int i = 0; i < 100; i++) {
+      logWriter.append(tableName, commitId++, put);
+    }
+    logWriter.sync(); // Should trigger a sized based rotation
+
+    // Get the new writer after the expected rotation.
+    LogFileWriter writerAfterRotation = logWriter.getWriter();
+    assertNotNull("New writer should not be null", writerAfterRotation);
+    assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
+
+    // Append one more mutation to verify we're using the new writer.
+    logWriter.append(tableName, commitId, put);
+    logWriter.sync();
+
+    // Verify the sequence of operations
+    InOrder inOrder = Mockito.inOrder(writerBeforeRotation, 
writerAfterRotation);
+    // Verify all appends before rotation went to the first writer.
+    for (int i = 1; i < commitId; i++) {
+      inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName), 
eq((long) i), eq(put));
+    }
+    inOrder.verify(writerBeforeRotation, times(1)).sync();
+    // Verify the final append went to the new writer.
+    inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId), eq(put));
+    inOrder.verify(writerAfterRotation, times(1)).sync();
+  }
+
+  /**
+   * Tests the close operation of the replication log. Verifies that the log 
properly closes its
+   * resources and prevents further operations after being closed.
+   */
+  @Test
+  public void testClose() throws Exception {
+    final String tableName = "TBLCLOSE";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    final long commitId = 1L;
+
+    // Get the inner writer
+    LogFileWriter innerWriter = logWriter.getWriter();
+    assertNotNull("Inner writer should not be null", innerWriter);
+
+    // Append some data
+    logWriter.append(tableName, commitId, put);
+
+    // Close the log writer
+    logWriter.close();
+
+    // Verify the inner writer was closed
+    verify(innerWriter, times(1)).close();
+
+    // Verify we can't append after close
+    try {
+      logWriter.append(tableName, commitId + 1, put);
+      fail("Expected append to fail after close");
+    } catch (IOException e) {
+      // Expected
+    }
+
+    // Verify we can't sync after close
+    try {
+      logWriter.sync();
+      fail("Expected sync to fail after close");
+    } catch (IOException e) {
+      // Expected
+    }
+
+    // Verify we can close multiple times without error
+    logWriter.close();
+  }
+
+  /**
+   * Tests the automatic rotation task. Verifies that the background rotation 
task correctly rotates
+   * log files based on the configured rotation time.
+   */
+  @Test
+  public void testRotationTask() throws Exception {
+    final String tableName = "TBLRT";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    long commitId = 1L;
+
+    LogFileWriter writerBeforeRotation = logWriter.getWriter();
+    assertNotNull("Initial writer should not be null", writerBeforeRotation);
+
+    // Append some data and wait for the rotation time to elapse plus a small 
buffer.
+    logWriter.append(tableName, commitId, put);
+    logWriter.sync();
+    Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
+
+    // Get the new writer after the rotation.
+    LogFileWriter writerAfterRotation = logWriter.getWriter();
+    assertNotNull("New writer should not be null", writerAfterRotation);
+    assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
+
+    // Verify first append and sync went to initial writer
+    verify(writerBeforeRotation, times(1)).append(eq(tableName), eq(1L), 
eq(put));
+    verify(writerBeforeRotation, times(1)).sync();
+    // Verify the initial writer was closed
+    verify(writerBeforeRotation, times(1)).close();
+  }
+
+  /**
+   * Tests behavior when log rotation fails temporarily but eventually 
succeeds. Verifies that:
+   * <ul>
+   * <li>The system can handle temporary rotation failures</li>
+   * <li>After failing twice, the third rotation attempt succeeds</li>
+   * <li>Operations continue correctly with the new writer after successful 
rotation</li>
+   * <li>The metrics for rotation failures are properly tracked</li>
+   * <li>Operations can continue with the current writer while rotation 
attempts are failing</li>
+   * </ul>
+   * <p>
+   * This test simulates a scenario where the first two rotation attempts fail 
(e.g., due to
+   * temporary HDFS issues) but the third attempt succeeds. This is a common 
real-world scenario
+   * where transient failures occur but the system eventually recovers. During 
the failed rotation
+   * attempts, the system should continue to operate normally with the current 
writer.
+   */
+  @Test
+  public void testFailedRotation() throws Exception {
+    final String tableName = "TBLFR";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    long commitId = 1L;
+
+    // Get the initial writer
+    LogFileWriter initialWriter = logWriter.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // Configure the log writer to fail only the first time when creating new 
writers.
+    AtomicBoolean shouldFail = new AtomicBoolean(true);
+    doAnswer(invocation -> {
+      if (shouldFail.getAndSet(false)) {
+        throw new IOException("Simulated failure to create new writer");
+      }
+      return invocation.callRealMethod();
+    }).when(logWriter).createNewWriter(any(FileSystem.class), any(URI.class));
+
+    // Append some data
+    logWriter.append(tableName, commitId, put);
+    logWriter.sync();
+
+    // Rotate the log.
+    LogFileWriter writerAfterFailedRotate = 
logWriter.rotateLog(RotationReason.TIME);
+    assertEquals("Should still be using the initial writer", initialWriter,
+      writerAfterFailedRotate);
+
+    // While rotation is failing, verify we can continue to use the current 
writer.
+    logWriter.append(tableName, commitId + 1, put);
+    logWriter.sync();
+
+    LogFileWriter writerAfterRotate = logWriter.rotateLog(RotationReason.TIME);
+    assertNotEquals("Should be using a new writer", initialWriter, 
writerAfterRotate);
+
+    // Try to append more data. This should work with the new writer after 
successful rotation.
+    logWriter.append(tableName, commitId + 2, put);
+    logWriter.sync();
+
+    // Verify operations went to the writers in the correct order
+    InOrder inOrder = Mockito.inOrder(initialWriter, writerAfterRotate);
+    // First append and sync on initial writer.
+    inOrder.verify(initialWriter).append(eq(tableName), eq(commitId), eq(put));
+    inOrder.verify(initialWriter).sync();
+    // Second append and sync on initial writer after failed rotation.
+    inOrder.verify(initialWriter).append(eq(tableName), eq(commitId + 1), 
eq(put));
+    inOrder.verify(initialWriter).sync();
+    // Final append and sync on new writer after successful rotation.
+    inOrder.verify(writerAfterRotate).append(eq(tableName), eq(commitId + 2), 
eq(put));
+    inOrder.verify(writerAfterRotate).sync();
+  }
+
+  /**
+   * This test simulates a scenario where rotation consistently fails and 
verifies that the system
+   * properly propagates an exception after exhausting all retry attempts.
+   */
+  @Test
+  public void testTooManyRotationFailures() throws Exception {
+    final String tableName = "TBLTMRF";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    long commitId = 1L;
+
+    LogFileWriter initialWriter = logWriter.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // Configure the log writer to always fail when creating new writers
+    doThrow(new IOException("Simulated failure to create new 
writer")).when(logWriter)
+      .createNewWriter(any(FileSystem.class), any(URI.class));
+
+    // Append some data
+    logWriter.append(tableName, commitId, put);
+    logWriter.sync();
+
+    // Try to rotate the log multiple times until we exceed the retry limit
+    for (int i = 0; i <= 
ReplicationLog.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES; i++) {
+      try {
+        logWriter.rotateLog(RotationReason.TIME);
+      } catch (IOException e) {
+        if (i < ReplicationLog.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES) {
+          // Not the last attempt yet, continue
+          continue;
+        }
+        // This was the last attempt, verify the exception
+        assertTrue("Expected IOException", e instanceof IOException);
+        assertTrue("Expected our mocked failure cause",
+          e.getMessage().contains("Simulated failure"));
+
+      }
+    }
+
+    // Verify subsequent operations fail because the log is closed
+    try {
+      logWriter.append(tableName, commitId + 1, put);
+      logWriter.sync();
+      fail("Expected append to fail because log is closed");
+    } catch (IOException e) {
+      assertTrue("Expected an IOException because log is closed",
+        e.getMessage().contains("Closed"));
+    }
+  }
+
+  /**
+   * Tests handling of critical exceptions during event processing. Verifies 
that the system
+   * properly handles critical errors by closing the log and preventing 
further operations.
+   */
+  @Test
+  public void testEventProcessingException() throws Exception {
+    final String tableName = "TBLEPE";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Get the inner writer
+    LogFileWriter innerWriter = logWriter.getWriter();
+    assertNotNull("Writer should not be null", innerWriter);
+
+    // Configure writer to throw a RuntimeException on append
+    doThrow(new RuntimeException("Simulated critical 
error")).when(innerWriter).append(anyString(),
+      anyLong(), any(Mutation.class));
+
+    // Append data. This should trigger the LogExceptionHandler, which will 
close logWriter.
+    logWriter.append(tableName, commitId, put);
+    try {
+      logWriter.sync();
+      fail("Should have thrown IOException because sync timed out");
+    } catch (IOException e) {
+      assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
+    }
+
+    // Verify that subsequent operations fail because the log is closed
+    try {
+      logWriter.append(tableName, commitId + 1, put);
+      fail("Should have thrown IOException because log is closed");
+    } catch (IOException e) {
+      assertTrue("Expected an IOException because log is closed",
+        e.getMessage().contains("Closed"));
+    }
+
+    // Verify that the inner writer was closed by the LogExceptionHandler
+    verify(innerWriter, times(1)).close();
+  }
+
+  /**
+   * Tests behavior when all sync retry attempts are exhausted. Verifies that 
the system properly
+   * handles the case where sync operations fail repeatedly and eventually 
timeout.
+   */
+  @Test
+  public void testSyncFailureAllRetriesExhausted() throws Exception {
+    final String tableName = "TBLSAFR";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Get the initial writer
+    LogFileWriter initialWriter = logWriter.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // Configure initial writer to fail on sync
+    doThrow(new IOException("Simulated sync 
failure")).when(initialWriter).sync();
+
+    // createNewWriter should keep returning the bad writer
+    doAnswer(invocation -> 
initialWriter).when(logWriter).createNewWriter(any(FileSystem.class),
+      any(URI.class));
+
+    // Append data
+    logWriter.append(tableName, commitId, put);
+
+    // Try to sync. Should fail after exhausting retries.
+    try {
+      logWriter.sync();
+      fail("Expected sync to fail after exhausting retries");
+    } catch (IOException e) {
+      assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
+    }
+
+    // Each retry creates a new writer, so that is at least 1 create + 5 
retries.
+    verify(logWriter, atLeast(6)).createNewWriter(any(FileSystem.class), 
any(URI.class));
+  }
+
+  /**
+   * Tests log rotation behavior during batch operations. Verifies that the 
system correctly handles
+   * rotation when there are pending batch operations, ensuring no data loss.
+   */
+  @Test
+  public void testRotationDuringBatch() throws Exception {
+    final String tableName = "TBLRDB";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    long commitId = 1L;
+
+    // Get the initial writer
+    LogFileWriter writerBeforeRotation = logWriter.getWriter();
+    assertNotNull("Initial writer should not be null", writerBeforeRotation);
+
+    // Append several items to fill currentBatch but don't sync yet
+    for (int i = 0; i < 5; i++) {
+      logWriter.append(tableName, commitId + i, put);
+    }
+
+    // Force a rotation by waiting for rotation time to elapse
+    Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
+
+    // Get the new writer after rotation
+    LogFileWriter writerAfterRotation = logWriter.getWriter();
+    assertNotNull("New writer should not be null", writerAfterRotation);
+    assertTrue("Writer should have been rotated", writerAfterRotation != 
writerBeforeRotation);
+
+    // Now trigger a sync which should replay the currentBatch to the new 
writer
+    logWriter.sync();
+
+    // Verify the sequence of operations
+    InOrder inOrder = Mockito.inOrder(writerBeforeRotation, 
writerAfterRotation);
+
+    // Verify all appends before rotation went to the first writer
+    for (int i = 0; i < 5; i++) {
+      inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName), 
eq(commitId + i),
+        eq(put));
+    }
+
+    // Verify the currentBatch was replayed to the new writer
+    for (int i = 0; i < 5; i++) {
+      inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName), 
eq(commitId + i),
+        eq(put));
+    }
+
+    // Verify sync happened on the new writer
+    inOrder.verify(writerAfterRotation, times(1)).sync();
+
+    // Verify the initial writer was closed
+    verify(writerBeforeRotation, times(1)).close();
+  }
+
+  /**
+   * Tests reading records after writing them to the log. Verifies that 
records written to the log
+   * can be correctly read back and match the original data.
+   */
+  @Test
+  public void testReadAfterWrite() throws Exception {
+    final String tableName = "TBLRAW";
+    final int NUM_RECORDS = 100;
+    List<LogFile.Record> originalRecords = new ArrayList<>();
+
+    // Get the path of the log file.
+    Path logPath = logWriter.getWriter().getContext().getFilePath();
+
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      LogFile.Record record = LogFileTestUtil.newPutRecord(tableName, i, "row" 
+ i, i, 1);
+      originalRecords.add(record);
+      logWriter.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
+    }
+    logWriter.sync(); // Sync to commit the appends to the current writer.
+
+    // Force a rotation to close the current writer.
+    logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+
+    assertTrue("Log file should exist", localFs.exists(logPath));
+
+    // Read and verify all records
+    LogFileReader reader = new LogFileReader();
+    LogFileReaderContext readerContext =
+      new 
LogFileReaderContext(conf).setFileSystem(localFs).setFilePath(logPath);
+    reader.init(readerContext);
+
+    List<LogFile.Record> readRecords = new ArrayList<>();
+    LogFile.Record record;
+    while ((record = reader.next()) != null) {
+      readRecords.add(record);
+    }
+
+    reader.close();
+
+    // Verify we have the expected number of records.
+    assertEquals("Number of records mismatch", NUM_RECORDS, 
readRecords.size());
+
+    // Verify each record matches the original.
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      LogFileTestUtil.assertRecordEquals("Record mismatch at index " + i, 
originalRecords.get(i),
+        readRecords.get(i));
+    }
+  }
+
+  /**
+   * Tests reading records after multiple log rotations. Verifies that records 
can be correctly read
+   * across multiple log files after several rotations, maintaining data 
consistency.
+   */
+  @Test
+  public void testReadAfterMultipleRotations() throws Exception {
+    final String tableName = "TBLRAMR";
+    final int NUM_RECORDS_PER_ROTATION = 100;
+    final int NUM_ROTATIONS = 10;
+    final int TOTAL_RECORDS = NUM_RECORDS_PER_ROTATION * NUM_ROTATIONS;
+    List<LogFile.Record> originalRecords = new ArrayList<>();
+    List<Path> logPaths = new ArrayList<>();
+
+    // Write records across multiple rotations.
+    for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
+      // Get the path of the current log file.
+      Path logPath = logWriter.getWriter().getContext().getFilePath();
+      logPaths.add(logPath);
+
+      for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
+        int commitId = (rotation * NUM_RECORDS_PER_ROTATION) + i;
+        LogFile.Record record =
+          LogFileTestUtil.newPutRecord(tableName, commitId, "row" + commitId, 
commitId, 1);
+        originalRecords.add(record);
+        logWriter.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
+      }
+      logWriter.sync(); // Sync to commit the appends to the current writer.
+      // Force a rotation to close the current writer.
+      logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+    }
+
+    // Verify all log files exist
+    for (Path logPath : logPaths) {
+      assertTrue("Log file should exist: " + logPath, localFs.exists(logPath));
+    }
+
+    // Read and verify all records from each log file, in the order in which 
the log files
+    // were written.
+    List<LogFile.Record> readRecords = new ArrayList<>();
+    for (Path logPath : logPaths) {
+      LogFileReader reader = new LogFileReader();
+      LogFileReaderContext readerContext =
+        new 
LogFileReaderContext(conf).setFileSystem(localFs).setFilePath(logPath);
+      reader.init(readerContext);
+
+      LogFile.Record record;
+      while ((record = reader.next()) != null) {
+        readRecords.add(record);
+      }
+      reader.close();
+    }
+
+    // Verify we have the expected number of records.
+    assertEquals("Total number of records mismatch", TOTAL_RECORDS, 
readRecords.size());
+
+    // Verify each record matches the original. This confirms the total 
ordering of all records
+    // in all files.
+    for (int i = 0; i < TOTAL_RECORDS; i++) {
+      LogFileTestUtil.assertRecordEquals("Record mismatch at index " + i, 
originalRecords.get(i),
+        readRecords.get(i));
+    }
+  }
+
+  /**
+   * Tests reading records after multiple rotations with intermittent syncs. 
If we do not sync when
+   * we roll a file, the in-flight batch is replayed into the new writer when 
we do finally sync
+   * (with the new writer). Verifies that records can be correctly read even 
when syncs are not
+   * performed before each rotation, ensuring data consistency.
+   */
+  @Test
+  public void testReadAfterMultipleRotationsWithReplay() throws Exception {
+    final String tableName = "TBLRAMRIS";
+    final int NUM_RECORDS_PER_ROTATION = 100;
+    final int NUM_ROTATIONS = 10;
+    final int TOTAL_RECORDS = NUM_RECORDS_PER_ROTATION * NUM_ROTATIONS;
+    List<LogFile.Record> originalRecords = new ArrayList<>();
+    List<Path> logPaths = new ArrayList<>();
+
+    // Write records across multiple rotations, only syncing 50% of the time.
+    for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
+      // Get the path of the current log file.
+      Path logPath = logWriter.getWriter().getContext().getFilePath();
+      logPaths.add(logPath);
+
+      for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
+        int commitId = (rotation * NUM_RECORDS_PER_ROTATION) + i;
+        LogFile.Record record =
+          LogFileTestUtil.newPutRecord(tableName, commitId, "row" + commitId, 
commitId, 1);
+        originalRecords.add(record);
+        logWriter.append(record.getHBaseTableName(), record.getCommitId(), 
record.getMutation());
+      }
+
+      // Only sync 50% of the time before rotation. To ensure we sync on the 
last file
+      // we are going to write, use 'rotation % 2 == 1' instead of 'rotation % 
2 == 0'.
+      if (rotation % 2 == 1) {
+        logWriter.sync(); // Sync to commit the appends to the current writer.
+      }
+      // Force a rotation to close the current writer.
+      logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+    }
+
+    // Verify all log files exist
+    for (Path logPath : logPaths) {
+      assertTrue("Log file should exist: " + logPath, localFs.exists(logPath));
+    }
+
+    // Read and verify all records from each log file, tracking unique records 
and duplicates.
+    Set<LogFile.Record> uniqueRecords = new HashSet<>();
+    List<LogFile.Record> allReadRecords = new ArrayList<>();
+
+    for (Path logPath : logPaths) {
+      LogFileReader reader = new LogFileReader();
+      LogFileReaderContext readerContext =
+        new 
LogFileReaderContext(conf).setFileSystem(localFs).setFilePath(logPath);
+      reader.init(readerContext);
+      LogFile.Record record;
+      while ((record = reader.next()) != null) {
+        allReadRecords.add(record);
+        uniqueRecords.add(record);
+      }
+      reader.close();
+    }
+
+    // Print statistics about duplicates for informational purposes.
+    LOG.info("{} total records across all files", allReadRecords.size());
+    LOG.info("{} unique records", uniqueRecords.size());
+    LOG.info("{} duplicate records", allReadRecords.size() - 
uniqueRecords.size());
+
+    // Verify we have all the expected unique records
+    assertEquals("Number of unique records mismatch", TOTAL_RECORDS, 
uniqueRecords.size());
+  }
+
+  /**
+   * Tests behavior when a RuntimeException occurs during writer.getLength() 
in shouldRotate().
+   * Verifies that the system properly handles critical errors by closing the 
log and preventing
+   * further operations.
+   */
+  @Test
+  public void testRuntimeExceptionDuringLengthCheck() throws Exception {
+    final String tableName = "TBLRDL";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Get the inner writer
+    LogFileWriter innerWriter = logWriter.getWriter();
+    assertNotNull("Writer should not be null", innerWriter);
+
+    // Configure writer to throw RuntimeException on getLength()
+    doThrow(new RuntimeException("Simulated critical 
error")).when(innerWriter).getLength();
+
+    // Append data. This should trigger the LogExceptionHandler, which will 
close logWriter.
+    logWriter.append(tableName, commitId, put);
+    try {
+      logWriter.sync();
+      fail("Should have thrown IOException because sync timed out");
+    } catch (IOException e) {
+      assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
+    }
+
+    // Verify that subsequent operations fail because the log is closed
+    try {
+      logWriter.append(tableName, commitId + 1, put);
+      fail("Should have thrown IOException because log is closed");
+    } catch (IOException e) {
+      assertTrue("Expected an IOException because log is closed",
+        e.getMessage().contains("Closed"));
+    }
+
+    // Verify that the inner writer was closed by the LogExceptionHandler
+    verify(innerWriter, times(1)).close();
+  }
+
+  /**
+   * Tests behavior when a RuntimeException occurs during append() after 
closeOnError() has been
+   * called. Verifies that the system properly rejects sync operations after 
being closed.
+   */
+  @Test
+  public void testAppendAfterCloseOnError() throws Exception {
+    final String tableName = "TBLAAE";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Get the inner writer
+    LogFileWriter innerWriter = logWriter.getWriter();
+    assertNotNull("Writer should not be null", innerWriter);
+
+    // Configure writer to throw RuntimeException on append
+    doThrow(new RuntimeException("Simulated critical 
error")).when(innerWriter).append(anyString(),
+      anyLong(), any(Mutation.class));
+
+    // Append data to trigger closeOnError()
+    logWriter.append(tableName, commitId, put);
+    try {
+      logWriter.sync();
+      fail("Should have thrown IOException because sync timed out");
+    } catch (IOException e) {
+      assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
+    }
+
+    // Verify that subsequent append operations fail because the log is closed
+    try {
+      logWriter.append(tableName, commitId, put);
+      fail("Should have thrown IOException because log is closed");
+    } catch (IOException e) {
+      assertTrue("Expected an IOException because log is closed",
+        e.getMessage().contains("Closed"));
+    }
+
+    // Verify that the inner writer was closed by the LogExceptionHandler
+    verify(innerWriter, times(1)).close();
+  }
+
+  /**
+   * Tests behavior when a RuntimeException occurs during sync() after 
closeOnError() has been
+   * called. Verifies that the system properly rejects sync operations after 
being closed.
+   */
+  @Test
+  public void testSyncAfterCloseOnError() throws Exception {
+    final String tableName = "TBLSAE";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Get the inner writer
+    LogFileWriter innerWriter = logWriter.getWriter();
+    assertNotNull("Writer should not be null", innerWriter);
+
+    // Configure writer to throw RuntimeException on append
+    doThrow(new RuntimeException("Simulated critical 
error")).when(innerWriter).append(anyString(),
+      anyLong(), any(Mutation.class));
+
+    // Append data to trigger closeOnError()
+    logWriter.append(tableName, commitId, put);
+    try {
+      logWriter.sync();
+      fail("Should have thrown IOException because sync timed out");
+    } catch (IOException e) {
+      assertTrue("Expected timeout exception", e.getCause() instanceof 
TimeoutException);
+    }
+
+    // Verify that subsequent sync operations fail because the log is closed
+    try {
+      logWriter.sync();
+      fail("Should have thrown IOException because log is closed");
+    } catch (IOException e) {
+      assertTrue("Expected an IOException because log is closed",
+        e.getMessage().contains("Closed"));
+    }
+
+    // Verify that the inner writer was closed by the LogExceptionHandler
+    verify(innerWriter, times(1)).close();
+  }
+
+  /**
+   * Tests race condition between LogRotationTask and LogEventHandler when 
both try to rotate the
+   * writer simultaneously. Verifies that despite concurrent rotation 
attempts, the log is only
+   * rotated once. Uses latches to ensure true concurrency and verify the 
sequence of operations.
+   */
+  @Test
+  public void testConcurrentRotationAttempts() throws Exception {
+    final String tableName = "TBLCR";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Get the initial writer
+    LogFileWriter initialWriter = logWriter.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // Create latches to control timing and track rotation attempts
+    final CountDownLatch rotationTaskStarted = new CountDownLatch(1);
+    final CountDownLatch rotationTaskCanProceed = new CountDownLatch(1);
+    final CountDownLatch eventHandlerStarted = new CountDownLatch(1);
+    final CountDownLatch eventHandlerCanProceed = new CountDownLatch(1);
+    final AtomicInteger rotationCount = new AtomicInteger(0);
+    final CountDownLatch bothAttemptsStarted = new CountDownLatch(2);
+
+    // Configure the rotation task to pause at specific points and track 
attempts
+    doAnswer(invocation -> {
+      rotationTaskStarted.countDown(); // Signal that rotation task has started
+      bothAttemptsStarted.countDown(); // Signal that this attempt has started
+      rotationTaskCanProceed.await(); // Wait for permission to proceed
+      rotationCount.incrementAndGet(); // Track this rotation attempt
+      return invocation.callRealMethod();
+    }).when(logWriter).rotateLog(ReplicationLog.RotationReason.TIME);
+
+    // Configure the event handler to pause at specific points
+    doAnswer(invocation -> {
+      eventHandlerStarted.countDown(); // Signal that event handler has started
+      bothAttemptsStarted.countDown(); // Signal that this attempt has started
+      eventHandlerCanProceed.await(); // Wait for permission to proceed
+      return invocation.callRealMethod();
+    }).when(logWriter).getWriter();
+
+    // Start a thread that will trigger rotation via the background task
+    Thread rotationThread = new Thread(() -> {
+      try {
+        // Force rotation by waiting for rotation time
+        Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    });
+    rotationThread.start();
+
+    // Start append operation in main thread
+    logWriter.append(tableName, commitId, put);
+
+    // Wait for both attempts to start - this ensures true concurrency
+    assertTrue("Both rotation attempts should start",
+      bothAttemptsStarted.await(5, TimeUnit.SECONDS));
+
+    // Verify both attempts have started before proceeding
+    assertEquals("Both attempts should have started", 0, 
bothAttemptsStarted.getCount());
+
+    // Allow both operations to proceed simultaneously
+    eventHandlerCanProceed.countDown();
+    rotationTaskCanProceed.countDown();
+
+    // Wait for both operations to complete
+    rotationThread.join();
+
+    // Verify the final state
+    LogFileWriter finalWriter = logWriter.getWriter();
+    assertNotNull("Final writer should not be null", finalWriter);
+    assertTrue("Writer should have been rotated", finalWriter != 
initialWriter);
+
+    // Verify only one rotation actually occurred
+    assertEquals("Should have only one actual rotation", 1, 
rotationCount.get());
+
+    // Verify all operations completed successfully
+    logWriter.sync();
+
+    // Verify the sequence of operations through the latches
+    assertTrue("Rotation task should have started", 
rotationTaskStarted.getCount() == 0);
+    assertTrue("Event handler should have started", 
eventHandlerStarted.getCount() == 0);
+  }
+
+  /**
+   * Tests race condition between LogEventHandler retry loop and 
LogRotationTask when both try to
+   * rotate the writer simultaneously. Verifies that despite concurrent 
rotation attempts during a
+   * retry scenario, the log is only rotated once. Uses latches to ensure true 
concurrency and
+   * verify the sequence of operations.
+   */
+  @Test
+  public void testConcurrentRotationDuringRetry() throws Exception {
+    final String tableName = "TBLCRR";
+    final long commitId = 1L;
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    // Get the initial writer
+    LogFileWriter initialWriter = logWriter.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // Create latches to control timing and track rotation attempts
+    final CountDownLatch retryStarted = new CountDownLatch(1);
+    final CountDownLatch retryCanProceed = new CountDownLatch(1);
+    final CountDownLatch rotationTaskStarted = new CountDownLatch(1);
+    final CountDownLatch rotationTaskCanProceed = new CountDownLatch(1);
+    final AtomicInteger rotationCount = new AtomicInteger(0);
+    final CountDownLatch bothAttemptsStarted = new CountDownLatch(2);
+
+    // Configure the writer to fail on first append, succeed on retry
+    doAnswer(invocation -> {
+      retryStarted.countDown(); // Signal that retry has started
+      bothAttemptsStarted.countDown(); // Signal that this attempt has started
+      retryCanProceed.await(); // Wait for permission to proceed
+      return invocation.callRealMethod();
+    }).when(initialWriter).append(anyString(), anyLong(), any(Mutation.class));
+
+    // Configure the rotation task to pause at specific points and track 
attempts
+    doAnswer(invocation -> {
+      rotationTaskStarted.countDown(); // Signal that rotation task has started
+      bothAttemptsStarted.countDown(); // Signal that this attempt has started
+      rotationTaskCanProceed.await(); // Wait for permission to proceed
+      rotationCount.incrementAndGet(); // Track this rotation attempt
+      return invocation.callRealMethod();
+    }).when(logWriter).rotateLog(ReplicationLog.RotationReason.TIME);
+
+    // Start a thread that will trigger rotation via the background task
+    Thread rotationThread = new Thread(() -> {
+      try {
+        // Force rotation by waiting for rotation time
+        Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    });
+    rotationThread.start();
+
+    // Start append operation in main thread
+    logWriter.append(tableName, commitId, put);
+
+    // Wait for both attempts to start - this ensures true concurrency
+    assertTrue("Both rotation attempts should start",
+      bothAttemptsStarted.await(5, TimeUnit.SECONDS));
+
+    // Verify both attempts have started before proceeding
+    assertEquals("Both attempts should have started", 0, 
bothAttemptsStarted.getCount());
+
+    // Allow both operations to proceed simultaneously
+    retryCanProceed.countDown();
+    rotationTaskCanProceed.countDown();
+
+    // Wait for both operations to complete
+    rotationThread.join();
+
+    // Verify the final state
+    LogFileWriter finalWriter = logWriter.getWriter();
+    assertNotNull("Final writer should not be null", finalWriter);
+    assertTrue("Writer should have been rotated", finalWriter != 
initialWriter);
+
+    // Verify only one rotation actually occurred
+    assertEquals("Should have only one actual rotation", 1, 
rotationCount.get());
+
+    // Verify all operations completed successfully
+    logWriter.sync();
+
+    // Verify the sequence of operations through the latches
+    assertTrue("Retry should have started", retryStarted.getCount() == 0);
+    assertTrue("Rotation task should have started", 
rotationTaskStarted.getCount() == 0);
+  }
+
+  /**
+   * Tests that multiple sync requests are consolidated into a single sync 
operation on the inner
+   * writer when they occur in quick succession. Verifies that the Disruptor 
batching and
+   * LogEventHandler processing correctly consolidates multiple sync requests 
into a single sync
+   * operation, while still completing all sync futures successfully.
+   */
+  @Test
+  public void testSyncConsolidation() throws Exception {
+    final String tableName = "TBLSC";
+    final Mutation put1 = LogFileTestUtil.newPut("row1", 1, 1);
+    final long commitId1 = 1L;
+    final Mutation put2 = LogFileTestUtil.newPut("row2", 2, 1);
+    final long commitId2 = 2L;
+    final Mutation put3 = LogFileTestUtil.newPut("row3", 3, 1);
+    final long commitId3 = 3L;
+
+    LogFileWriter innerWriter = logWriter.getWriter();
+    assertNotNull("Inner writer should not be null", innerWriter);
+
+    // Configure writer to briefly hold up the LogEventHandler upon first 
append.
+    doAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        Thread.sleep(50); // Delay to allow multiple events to be posted
+        return invocation.callRealMethod();
+      }
+    }).when(innerWriter).append(eq(tableName), eq(commitId1), eq(put1));
+
+    // Post appends and three syncs in quick succession. The first append will 
be delayed long
+    // enough for the three syncs to appear in a single Disruptor batch. Then 
they should all
+    // be consolidated into a single sync.
+    logWriter.append(tableName, commitId1, put1);
+    logWriter.sync();
+    logWriter.append(tableName, commitId2, put2);
+    logWriter.sync();
+    logWriter.append(tableName, commitId3, put3);
+    logWriter.sync();
+
+    // Verify the sequence of operations on the inner writer: the three 
appends, then exactly
+    // one sync.
+    InOrder inOrder = Mockito.inOrder(innerWriter);
+    inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId1), 
eq(put1));
+    inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId2), 
eq(put2));
+    inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId3), 
eq(put3));
+    inOrder.verify(innerWriter, times(1)).sync(); // Only one sync should be 
called
+  }
+
+  static class TestableReplicationLog extends ReplicationLog {
+
+    protected TestableReplicationLog(Configuration conf, ServerName 
serverName) {
+      super(conf, serverName);
+    }
+
+    @Override
+    protected LogFileWriter createNewWriter(FileSystem fs, URI url) throws 
IOException {
+      return spy(super.createNewWriter(fs, url));
+    }
+
+    @Override
+    protected MetricsReplicationLogSource createMetricsSource() {
+      return new MetricsReplicationLogSourceImpl();
+    }
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index f41f7dad75..24fc8e655f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,10 +98,10 @@
     <jackson-bom.version>2.18.4.1</jackson-bom.version>
     <netty-bom.version>4.1.126.Final</netty-bom.version>
     <antlr.version>3.5.2</antlr.version>
+    <disruptor.version>3.3.6</disruptor.version>
     <!-- Only used for tests with HBase 2.1-2.4 -->
     <reload4j.version>1.2.19</reload4j.version>
     <log4j2.version>2.18.0</log4j2.version>
-    <disruptor.version>3.3.6</disruptor.version>
     <slf4j.version>1.7.36</slf4j.version>
     <!-- com.google repo will be used except on Aarch64 platform. -->
     <protobuf.group>com.google.protobuf</protobuf.group>


Reply via email to