This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new a0b899421f PHOENIX-7567 Replication Log Writer (Synchronous mode)
(#2144)
a0b899421f is described below
commit a0b899421f57011da1b23e271b489bec005ba25c
Author: Andrew Purtell <[email protected]>
AuthorDate: Thu Dec 18 10:58:13 2025 -0800
PHOENIX-7567 Replication Log Writer (Synchronous mode) (#2144)
Conflicts:
phoenix-core-server/pom.xml
phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
---
phoenix-core-server/pom.xml | 12 +
.../apache/phoenix/replication/ReplicationLog.java | 1085 ++++++++++++++++
.../phoenix/replication/log/LogFileWriter.java | 17 +-
.../metrics/MetricsReplicationLogSource.java | 114 ++
.../metrics/MetricsReplicationLogSourceImpl.java | 133 ++
.../metrics/ReplicationLogMetricValues.java | 83 ++
.../phoenix/replication/metrics/package-info.java | 23 +
.../apache/phoenix/replication/package-info.java | 23 +
.../phoenix/replication/tool/LogFileAnalyzer.java | 209 +++
.../org/apache/phoenix/replication/tool/README.md | 70 +
.../phoenix/replication/tool/package-info.java | 22 +
.../phoenix/replication/ReplicationLogTest.java | 1344 ++++++++++++++++++++
pom.xml | 2 +-
13 files changed, 3135 insertions(+), 2 deletions(-)
diff --git a/phoenix-core-server/pom.xml b/phoenix-core-server/pom.xml
index 54c6afe39b..ef5140feb9 100644
--- a/phoenix-core-server/pom.xml
+++ b/phoenix-core-server/pom.xml
@@ -87,6 +87,14 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-metrics-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop-compat</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-hadoop2-compat</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
@@ -177,6 +185,10 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.lmax</groupId>
+ <artifactId>disruptor</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
new file mode 100644
index 0000000000..3a2d9567a0
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -0,0 +1,1085 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.YieldingWaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogSource;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogSourceImpl;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * The ReplicationLog implements a high-performance logging system for
mutations that need to be
+ * replicated to another cluster.
+ * <p>
+ * Key features:
+ * <ul>
+ * <li>Asynchronous append operations with batching for high throughput</li>
+ * <li>Controlled blocking sync operations with timeout and retry logic</li>
+ * <li>Automatic log rotation based on time and size thresholds</li>
+ * <li>Sharded directory structure for better HDFS performance</li>
+ * <li>Metrics for monitoring</li>
+ * <li>Fail-stop behavior on critical errors</li>
+ * </ul>
+ * <p>
+ * The class supports three replication modes (though currently only SYNC is
implemented):
+ * <ul>
+ * <li>SYNC: Direct writes to standby cluster (default)</li>
+ * <li>STORE_AND_FORWARD: Local storage when standby is unavailable</li>
+ * <li>SYNC_AND_FORWARD: Concurrent direct writes and queue draining</li>
+ * </ul>
+ * <p>
+ * Key configuration properties:
+ * <ul>
+ * <li>{@link #REPLICATION_STANDBY_HDFS_URL_KEY}: URL for standby cluster
HDFS</li>
+ * <li>{@link #REPLICATION_FALLBACK_HDFS_URL_KEY}: URL for local fallback
storage</li>
+ * <li>{@link #REPLICATION_NUM_SHARDS_KEY}: Number of shard directories</li>
+ * <li>{@link #REPLICATION_LOG_ROTATION_TIME_MS_KEY}: Time-based rotation
interval</li>
+ * <li>{@link #REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY}: Size-based rotation
threshold</li>
+ * <li>{@link #REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY}: Compression
algorithm</li>
+ * </ul>
+ * <p>
+ * This class is intended to be thread-safe.
+ * <p>
+ * Architecture Overview:
+ *
+ * <pre>
+ * ┌──────────────────────────────────────────────────────────────────────┐
+ * │ ReplicationLog │
+ * │ │
+ * │ ┌─────────────┐ ┌────────────────────────────────────────────┐ │
+ * │ │ │ │ │ │
+ * │ │ Producers │ │ Disruptor Ring Buffer │ │
+ * │ │ (append/ │────▶│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
+ * │ │ sync) │ │ │ Event 1 │ │ Event 2 │ │ Event 3 │ ... │ │
+ * │ │ │ │ └─────────┘ └─────────┘ └─────────┘ │ │
+ * │ └─────────────┘ └────────────────────────────────────────────┘ │
+ * │ │ │
+ * │ ▼ │
+ * │ ┌─────────────────────────────────────────────────────────────┐ │
+ * │ │ │ │
+ * │ │ LogEventHandler │ │
+ * │ │ ┌──────────────────────────────────────────────────────┐ │ │
+ * │ │ │ │ │ │
+ * │ │ │ - Batch Management │ │ │
+ * │ │ │ - Writer Rotation │ │ │
+ * │ │ │ - Error Handling │ │ │
+ * │ │ │ - Mode Transitions │ │ │
+ * │ │ │ │ │ │
+ * │ │ └──────────────────────────────────────────────────────┘ │ │
+ * │ │ │ │ │
+ * │ │ ▼ │ │
+ * │ │ ┌──────────────────────────────────────────────────────┐ │ │
+ * │ │ │ │ │ │
+ * │ │ │ LogFileWriter │ │ │
+ * │ │ │ - File Management │ │ │
+ * │ │ │ - Compression │ │ │
+ * │ │ │ - HDFS Operations │ │ │
+ * │ │ │ │ │ │
+ * │ │ └──────────────────────────────────────────────────────┘ │ │
+ * │ └─────────────────────────────────────────────────────────────┘ │
+ * └──────────────────────────────────────────────────────────────────────┘
+ * </pre>
+ * <p>
+ * A high-performance ring buffer decouples the API from the complexity of
writer management.
+ * Callers of append and sync generally return quickly, except for sync, where
the writer must
+ * suspend the caller until the sync operation is successful (or times out).
An internal single
+ * threaded process handles these events, encapsulating the complexity of
batching mutations for
+ * efficiency, consolidating multiple in-flight syncs, rotating writers based
on time or size, error
+ * handling and retries, and mode transitions.
+ */
[email protected](
+ value = { "EI_EXPOSE_REP", "EI_EXPOSE_REP2", "MS_EXPOSE_REP" },
justification = "Intentional")
+public class ReplicationLog {
+
+ /** The path on the standby HDFS where log files should be written. (The
"IN" directory.) */
+ public static final String REPLICATION_STANDBY_HDFS_URL_KEY =
+ "phoenix.replication.log.standby.hdfs.url";
+ /**
+ * The path on the active HDFS where log files should be written when we
have fallen back to store
+ * and forward mode. (The "OUT" directory.)
+ */
+ public static final String REPLICATION_FALLBACK_HDFS_URL_KEY =
+ "phoenix.replication.log.fallback.hdfs.url";
+ /**
+ * The number of shards (subfolders) to maintain in the "IN" directory.
+ * <p>
+ * Shard directories have the format shard-NNNNN, e.g. shard-00001. The
maximum value is 100000.
+ */
+ public static final String REPLICATION_NUM_SHARDS_KEY =
"phoenix.replication.log.shards";
+ public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000;
+ public static final int MAX_REPLICATION_NUM_SHARDS = 100000;
+ /** Replication log rotation time trigger, default is 1 minute */
+ public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
+ "phoenix.replication.log.rotation.time.ms";
+ public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 *
1000L;
+ /** Replication log rotation size trigger, default is 256 MB */
+ public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY =
+ "phoenix.replication.log.rotation.size.bytes";
+ public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 *
1024 * 1024L;
+ /** Replication log rotation size trigger percentage, default is 0.95 */
+ public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY =
+ "phoenix.replication.log.rotation.size.percentage";
+ public static final double DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE
= 0.95;
+ /** Replication log compression, default is "NONE" */
+ public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY =
+ "phoenix.replication.log.compression";
+ public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM =
"NONE";
+ public static final String REPLICATION_LOG_RINGBUFFER_SIZE_KEY =
+ "phoenix.replication.log.ringbuffer.size";
+ public static final int DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE = 1024 * 32;
// Too big?
+ public static final String REPLICATION_LOG_SYNC_TIMEOUT_KEY =
+ "phoenix.replication.log.sync.timeout.ms";
+ public static final long DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT = 1000 * 30;
+ public static final String REPLICATION_LOG_SYNC_RETRIES_KEY =
+ "phoenix.replication.log.sync.retries";
+ public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 5;
+ public static final String REPLICATION_LOG_ROTATION_RETRIES_KEY =
+ "phoenix.replication.log.rotation.retries";
+ public static final int DEFAULT_REPLICATION_LOG_ROTATION_RETRIES = 5;
+ public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
+ "phoenix.replication.log.retry.delay.ms";
+ public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
+
+ public static final String SHARD_DIR_FORMAT = "shard%05d";
+ public static final String FILE_NAME_FORMAT = "%d-%s.plog";
+
+ static final byte EVENT_TYPE_DATA = 0;
+ static final byte EVENT_TYPE_SYNC = 1;
+
+ static final Logger LOG = LoggerFactory.getLogger(ReplicationLog.class);
+
+ protected static volatile ReplicationLog instance;
+
+ protected final Configuration conf;
+ protected final ServerName serverName;
+ protected FileSystem standbyFs;
+ protected FileSystem fallbackFs; // For store-and-forward (future use)
+ protected int numShards;
+ protected URI standbyUrl;
+ protected URI fallbackUrl; // For store-and-forward (future use)
+ protected final long rotationTimeMs;
+ protected final long rotationSizeBytes;
+ protected final int maxRotationRetries;
+ protected final Compression.Algorithm compression;
+ protected final ReentrantLock lock = new ReentrantLock();
+ protected volatile LogFileWriter currentWriter; // Current writer
+ protected final AtomicLong lastRotationTime = new AtomicLong();
+ protected final AtomicLong writerGeneration = new AtomicLong();
+ protected final AtomicLong rotationFailures = new AtomicLong(0);
+ protected ScheduledExecutorService rotationExecutor;
+ protected final int ringBufferSize;
+ protected final long syncTimeoutMs;
+ protected Disruptor<LogEvent> disruptor;
+ protected RingBuffer<LogEvent> ringBuffer;
+ protected final ConcurrentHashMap<Path, Object> shardMap = new
ConcurrentHashMap<>();
+ protected final MetricsReplicationLogSource metrics;
+ protected volatile boolean isClosed = false;
+
+ /**
+ * Tracks the current replication mode of the ReplicationLog.
+ * <p>
+ * The replication mode determines how mutations are handled:
+ * <ul>
+ * <li>SYNC: Normal operation where mutations are written directly to the
standby cluster's HDFS.
+ * This is the default and primary mode of operation.</li>
+ * <li>STORE_AND_FORWARD: Fallback mode when the standby cluster's HDFS is
unavailable. Mutations
+ * are stored locally and will be forwarded when connectivity is
restored.</li>
+ * <li>SYNC_AND_FORWARD: Transitional mode where new mutations are written
directly to the standby
+ * cluster while concurrently draining the local queue of previously stored
mutations.</li>
+ * </ul>
+ * <p>
+ * Mode transitions occur automatically based on the availability of the
standby cluster's HDFS
+ * and the state of the local mutation queue.
+ */
+ protected enum ReplicationMode {
+ /**
+ * Normal operation where mutations are written directly to the standby
cluster's HDFS. This is
+ * the default and primary mode of operation.
+ */
+ SYNC,
+
+ /**
+ * Fallback mode when the standby cluster's HDFS is unavailable. Mutations
are stored locally
+ * and will be forwarded when connectivity is restored.
+ */
+ STORE_AND_FORWARD,
+
+ /**
+ * Transitional mode where new mutations are written directly to the
standby cluster while
+ * concurrently draining the local queue of previously stored mutations.
This mode is entered
+ * when connectivity to the standby cluster is restored while there are
still mutations in the
+ * local queue.
+ */
+ SYNC_AND_FORWARD;
+ }
+
+ /** The reason for requesting a log rotation. */
+ protected enum RotationReason {
+ /** Rotation requested due to time threshold being exceeded. */
+ TIME,
+ /** Rotation requested due to size threshold being exceeded. */
+ SIZE,
+ /** Rotation requested due to an error condition. */
+ ERROR;
+ }
+
+ /**
+ * The current replication mode. Always SYNC for now.
+ * <p>
+ * TODO: Implement mode transitions to STORE_AND_FORWARD when standby
becomes unavailable.
+ * <p>
+ * TODO: Implement mode transitions to SYNC_AND_FORWARD when draining queue.
+ */
+ protected volatile ReplicationMode currentMode = ReplicationMode.SYNC;
+
+ // TODO: Add configuration keys for store-and-forward behavior
+ // - Maximum retry attempts before switching to store-and-forward
+ // - Retry delay between attempts
+ // - Queue drain batch size
+ // - Queue drain interval
+
+ // TODO: Add state tracking fields
+ // - Queue of pending changes when in store-and-forward mode
+ // - Timestamp of last successful standby write
+ // - Error count for tracking consecutive failures
+
+ // TODO: Add methods for state transitions
+ // - switchToStoreAndForward() - Called when standby becomes unavailable
+ // - switchToSync() - Called when standby becomes available again
+ // - drainQueue() - Background task to process queued changes
+
+ // TODO: Enhance error handling in LogEventHandler
+ // - Track consecutive failures
+ // - Switch to store-and-forward after max retries
+
+ // TODO: Implement queue management for store-and-forward mode
+ // - Implement queue persistence to handle RegionServer restarts
+ // - Implement queue draining when in SYNC_AND_FORWARD state
+ // - Implement automatic recovery from temporary network issues
+ // - Add configurable thresholds for switching to store-and-forward based on
write latency
+ // - Add circuit breaker pattern to prevent overwhelming the standby cluster
+ // - Add queue size limits and backpressure mechanisms
+ // - Add queue metrics for monitoring (queue size, oldest entry age, etc.)
+
+ // TODO: Enhance metrics for replication health monitoring
+ // - Add metrics for replication lag between active and standby
+ // - Track time spent in each replication mode (SYNC, STORE_AND_FORWARD,
SYNC_AND_FORWARD)
+ // - Monitor queue drain rate and backlog size
+ // - Track consecutive failures and mode transition events
+
+ /**
+ * Gets the singleton instance of the ReplicationLogManager using the lazy
initializer pattern.
+ * Initializes the instance if it hasn't been created yet.
+ * @param conf Configuration object.
+ * @param serverName The server name.
+ * @return The singleton ReplicationLogManager instance.
+ * @throws IOException If initialization fails.
+ */
+ public static ReplicationLog get(Configuration conf, ServerName serverName)
throws IOException {
+ if (instance == null) {
+ synchronized (ReplicationLog.class) {
+ if (instance == null) {
+ // Complete initialization before assignment
+ ReplicationLog logManager = new ReplicationLog(conf, serverName);
+ logManager.init();
+ instance = logManager;
+ }
+ }
+ }
+ return instance;
+ }
+
+ protected ReplicationLog(Configuration conf, ServerName serverName) {
+ this.conf = conf;
+ this.serverName = serverName;
+ this.rotationTimeMs =
+ conf.getLong(REPLICATION_LOG_ROTATION_TIME_MS_KEY,
DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS);
+ long rotationSize = conf.getLong(REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
+ DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
+ double rotationSizePercent =
conf.getDouble(REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY,
+ DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE);
+ this.rotationSizeBytes = (long) (rotationSize * rotationSizePercent);
+ this.maxRotationRetries =
+ conf.getInt(REPLICATION_LOG_ROTATION_RETRIES_KEY,
DEFAULT_REPLICATION_LOG_ROTATION_RETRIES);
+ this.numShards = conf.getInt(REPLICATION_NUM_SHARDS_KEY,
DEFAULT_REPLICATION_NUM_SHARDS);
+ String compressionName =
conf.get(REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY,
+ DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM);
+ Compression.Algorithm compression = Compression.Algorithm.NONE;
+ if
(!DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM.equalsIgnoreCase(compressionName))
{
+ try {
+ compression =
Compression.getCompressionAlgorithmByName(compressionName);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Unknown compression type " + compressionName + ", using
NONE", e);
+ }
+ }
+ this.compression = compression;
+ this.ringBufferSize =
+ conf.getInt(REPLICATION_LOG_RINGBUFFER_SIZE_KEY,
DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE);
+ this.syncTimeoutMs =
+ conf.getLong(REPLICATION_LOG_SYNC_TIMEOUT_KEY,
DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT);
+ this.metrics = createMetricsSource();
+ }
+
+ /** Creates a new metrics source for monitoring replication log operations.
*/
+ protected MetricsReplicationLogSource createMetricsSource() {
+ return new MetricsReplicationLogSourceImpl();
+ }
+
+ /** Returns the metrics source for monitoring replication log operations. */
+ public MetricsReplicationLogSource getMetrics() {
+ return metrics;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void init() throws IOException {
+ if (numShards > MAX_REPLICATION_NUM_SHARDS) {
+ throw new IllegalArgumentException(REPLICATION_NUM_SHARDS_KEY + " is " +
numShards
+ + ", but the limit is " + MAX_REPLICATION_NUM_SHARDS);
+ }
+ initializeFileSystems();
+ // Start time based rotation.
+ lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
+ startRotationExecutor();
+ // Create the initial writer. Do this before we call
LogEventHandler.init().
+ currentWriter = createNewWriter(standbyFs, standbyUrl);
+ // Initialize the Disruptor. We use ProducerType.MULTI because multiple
handlers might
+ // call append concurrently. We use YieldingWaitStrategy for low latency.
When the ring
+ // buffer is full (controlled by REPLICATION_WRITER_RINGBUFFER_SIZE_KEY),
producers
+ // calling ringBuffer.next() will effectively block (by
yielding/spinning), creating
+ // backpressure on the callers. This ensures appends don't proceed until
there is space.
+ disruptor = new Disruptor<>(
+ LogEvent.EVENT_FACTORY, ringBufferSize, new ThreadFactoryBuilder()
+
.setNameFormat("ReplicationLogEventHandler-%d").setDaemon(true).build(),
+ ProducerType.MULTI, new YieldingWaitStrategy());
+ LogEventHandler eventHandler = new LogEventHandler();
+ eventHandler.init();
+ disruptor.handleEventsWith(eventHandler);
+ LogExceptionHandler exceptionHandler = new LogExceptionHandler();
+ disruptor.setDefaultExceptionHandler(exceptionHandler);
+ ringBuffer = disruptor.start();
+ LOG.info("ReplicationLogWriter started with ring buffer size {}",
ringBufferSize);
+ }
+
+ /**
+ * Append a mutation to the log. This method is non-blocking and returns
quickly, unless the ring
+ * buffer is full. The actual write happens asynchronously. We expect
multiple append() calls
+ * followed by a sync(). The appends will be batched by the Disruptor.
Should the ring buffer
+ * become full, which is not expected under normal operation but could (and
should) happen if the
+ * log file writer is unable to make progress, due to a HDFS level
disruption. Should we enter
+ * that condition this method will block until the append can be inserted.
+ * <p>
+ * An internal error may trigger fail-stop behavior. Subsequent to
fail-stop, this method will
+ * throw an IOException("Closed"). No further appends are allowed.
+ * @param tableName The name of the HBase table the mutation applies to.
+ * @param commitId The commit identifier (e.g., SCN) associated with the
mutation.
+ * @param mutation The HBase Mutation (Put or Delete) to be logged.
+ * @throws IOException If the writer is closed or if the ring buffer is full.
+ */
+ public void append(String tableName, long commitId, Mutation mutation)
throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Append: table={}, commitId={}, mutation={}", tableName,
commitId, mutation);
+ }
+ if (isClosed) {
+ throw new IOException("Closed");
+ }
+ long startTime = System.nanoTime();
+ // ringBuffer.next() claims the next sequence number. Because we
initialize the Disruptor
+ // with ProducerType.MULTI and the blocking YieldingWaitStrategy this call
WILL BLOCK if
+ // the ring buffer is full, thus providing backpressure to the callers.
+ long sequence = ringBuffer.next();
+ try {
+ LogEvent event = ringBuffer.get(sequence);
+ event.setValues(EVENT_TYPE_DATA, new Record(tableName, commitId,
mutation), null);
+ metrics.updateAppendTime(System.nanoTime() - startTime);
+ } finally {
+ // Update ring buffer events metric
+ ringBuffer.publish(sequence);
+ }
+ }
+
+ /**
+ * Ensures all previously appended records are durably persisted. This
method blocks until the
+ * sync operation completes or fails, potentially after internal retries.
All in flight appends
+ * are batched and provided to the underlying LogWriter, which will then be
synced. If there is a
+ * problem syncing the LogWriter we will retry, up to the retry limit,
rolling the writer for each
+ * retry.
+ * <p>
+ * An internal error may trigger fail-stop behavior. Subsequent to
fail-stop, this method will
+ * throw an IOException("Closed"). No further syncs are allowed.
+ * <p>
+ * NOTE: When the ReplicationLogManager is capable of switching between
synchronous and fallback
+ * (store-and-forward) writers, then this will be pretty bullet proof. Right
now we will still try
+ * to roll the synchronous writer a few times before giving up.
+ * @throws IOException If the sync operation fails after retries, or if
interrupted.
+ */
+ public void sync() throws IOException {
+ if (isClosed) {
+ throw new IOException("Closed");
+ }
+ syncInternal();
+ }
+
+ /**
+ * Internal implementation of sync that publishes a sync event to the ring
buffer and waits for
+ * completion.
+ */
+ protected void syncInternal() throws IOException {
+ long startTime = System.nanoTime();
+ CompletableFuture<Void> syncFuture = new CompletableFuture<>();
+ long sequence = ringBuffer.next();
+ try {
+ LogEvent event = ringBuffer.get(sequence);
+ event.setValues(EVENT_TYPE_SYNC, null, syncFuture);
+ } finally {
+ ringBuffer.publish(sequence);
+ }
+ LOG.trace("Published EVENT_TYPE_SYNC at sequence {}", sequence);
+ try {
+ // Wait for the event handler to process up to and including this sync
event
+ syncFuture.get(syncTimeoutMs, TimeUnit.MILLISECONDS);
+ metrics.updateSyncTime(System.nanoTime() - startTime);
+ } catch (InterruptedException e) {
+ // Almost certainly the regionserver is shutting down or aborting.
+ // TODO: Do we need to do more here?
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Interrupted while waiting for sync");
+ } catch (ExecutionException e) {
+ LOG.error("Sync operation failed", e.getCause());
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new IOException("Sync operation failed", e.getCause());
+ }
+ } catch (TimeoutException e) {
+ String message = "Sync operation timed out";
+ LOG.error(message);
+ throw new IOException(message, e);
+ }
+ }
+
+ /** Initializes the standby and fallback filesystems and creates their log
directories. */
+ protected void initializeFileSystems() throws IOException {
+ String standbyUrlString = conf.get(REPLICATION_STANDBY_HDFS_URL_KEY);
+ if (standbyUrlString == null) {
+ throw new IOException(REPLICATION_STANDBY_HDFS_URL_KEY + " is not
configured");
+ }
+ // Only validate that the URI is well formed. We should not assume the
scheme must be
+ // "hdfs" because perhaps the operator will substitute another FileSystem
implementation
+ // for DistributedFileSystem.
+ try {
+ this.standbyUrl = new URI(standbyUrlString);
+ } catch (URISyntaxException e) {
+ throw new IOException(REPLICATION_STANDBY_HDFS_URL_KEY + " is not
valid", e);
+ }
+ String fallbackUrlString = conf.get(REPLICATION_FALLBACK_HDFS_URL_KEY);
+ if (fallbackUrlString != null) {
+ // Only validate that the URI is well formed, as above.
+ try {
+ this.fallbackUrl = new URI(fallbackUrlString);
+ } catch (URISyntaxException e) {
+ throw new IOException(REPLICATION_FALLBACK_HDFS_URL_KEY + " is not
valid", e);
+ }
+ this.fallbackFs = getFileSystem(fallbackUrl);
+ Path fallbackLogDir = new Path(fallbackUrl.getPath());
+ if (!fallbackFs.exists(fallbackLogDir)) {
+ LOG.info("Creating directory {}", fallbackUrlString);
+ if (!this.fallbackFs.mkdirs(fallbackLogDir)) {
+ throw new IOException("Failed to create directory: " +
fallbackUrlString);
+ }
+ }
+ } else {
+ // We support a synchronous replication only option if store-and-forward
configuration
+ // keys are missing. This is outside the scope of the design spec but
potentially
+ // useful for testing and also allows an operator to prefer failover
consistency and
+ // simplicity over availability, even if that is not recommended. Log it
at WARN level
+ // to focus appropriate attention. (Should it be ERROR?)
+ LOG.warn("Fallback not configured ({}), store-and-forward DISABLED.",
+ REPLICATION_FALLBACK_HDFS_URL_KEY);
+ this.fallbackFs = null;
+ }
+ // Configuration is sorted, and possibly store-and-forward directories
have been created,
+ // now create the standby side directories as needed.
+ this.standbyFs = getFileSystem(standbyUrl);
+ Path standbyLogDir = new Path(standbyUrl.getPath());
+ if (!standbyFs.exists(standbyLogDir)) {
+ LOG.info("Creating directory {}", standbyUrlString);
+ if (!standbyFs.mkdirs(standbyLogDir)) {
+ throw new IOException("Failed to create directory: " +
standbyUrlString);
+ }
+ }
+ }
+
+ /** Gets a FileSystem instance for the given URI using the current
configuration. */
+ protected FileSystem getFileSystem(URI uri) throws IOException {
+ return FileSystem.get(uri, conf);
+ }
+
+ /** Calculates the interval for checking log rotation based on the
configured rotation time. */
+ protected long getRotationCheckInterval(long rotationTimeMs) {
+ long interval;
+ if (rotationTimeMs > 0) {
+ interval = rotationTimeMs / 4;
+ } else {
+ // If rotation time is not configured or invalid, use a sensible default
like 10 seconds
+ interval = 10000L;
+ }
+ return interval;
+ }
+
+ /** Starts the background task for time-based log rotation. */
+ protected void startRotationExecutor() {
+ Preconditions.checkState(rotationExecutor == null, "Rotation executor
already started");
+ rotationExecutor = Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("ReplicationLogRotator-%d").setDaemon(true).build());
+ long interval = getRotationCheckInterval(rotationTimeMs);
+ rotationExecutor.scheduleWithFixedDelay(new LogRotationTask(), interval,
interval,
+ TimeUnit.MILLISECONDS);
+ }
+
+ /** Stops the background task for time-based log rotation. */
+ protected void stopRotationExecutor() {
+ if (rotationExecutor != null) {
+ rotationExecutor.shutdownNow();
+ rotationExecutor = null;
+ }
+ }
+
+ /** Gets the current writer, rotating it if necessary based on size
thresholds. */
+ protected LogFileWriter getWriter() throws IOException {
+ lock.lock();
+ try {
+ if (shouldRotate()) {
+ rotateLog(RotationReason.SIZE);
+ }
+ return currentWriter;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Checks if the current log file needs to be rotated based on time or size.
Must be called under
+ * lock.
+ * @return true if rotation is needed, false otherwise.
+ * @throws IOException If an error occurs checking the file size.
+ */
+ protected boolean shouldRotate() throws IOException {
+ if (currentWriter == null) {
+ LOG.warn("Current writer is null, forcing rotation.");
+ return true;
+ }
+ // Check time threshold
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ long last = lastRotationTime.get();
+ if (now - last >= rotationTimeMs) {
+ LOG.debug("Rotating log file due to time threshold ({} ms elapsed,
threshold {} ms)",
+ now - last, rotationTimeMs);
+ return true;
+ }
+
+ // Check size threshold (using actual file size for accuracy)
+ long currentSize = currentWriter.getLength();
+ if (currentSize >= rotationSizeBytes) {
+ LOG.debug("Rotating log file due to size threshold ({} bytes, threshold
{} bytes)",
+ currentSize, rotationSizeBytes);
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Closes the current log writer and opens a new one, updating rotation
metrics.
+ * <p>
+ * This method handles the rotation of log files, which can be triggered by:
+ * <ul>
+ * <li>Time threshold exceeded (TIME)</li>
+ * <li>Size threshold exceeded (SIZE)</li>
+ * <li>Error condition requiring rotation (ERROR)</li>
+ * </ul>
+ * <p>
+ * The method implements retry logic for handling rotation failures. If
rotation fails, it retries
+ * up to maxRotationRetries times. If the number of failures exceeds
maxRotationRetries, an
+ * exception is thrown. Otherwise, it logs a warning and continues with the
current writer.
+ * <p>
+ * The method is thread-safe and uses a lock to ensure atomic rotation
operations.
+ * @param reason The reason for requesting log rotation
+ * @return The new LogFileWriter instance if rotation succeeded, or the
current writer if rotation
+ * failed
+ * @throws IOException if rotation fails after exceeding maxRotationRetries
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
"UL_UNRELEASED_LOCK",
+ justification = "False positive")
+ protected LogFileWriter rotateLog(RotationReason reason) throws IOException {
+ lock.lock();
+ try {
+ // Try to get the new writer first. If it fails we continue using the
current writer.
+ // Increment the writer generation
+ LogFileWriter newWriter = createNewWriter(standbyFs, standbyUrl);
+ LOG.debug("Created new writer: {}", newWriter);
+ // Close the current writer
+ if (currentWriter != null) {
+ LOG.debug("Closing current writer: {}", currentWriter);
+ closeWriter(currentWriter);
+ }
+ currentWriter = newWriter;
+ lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
+ rotationFailures.set(0);
+ metrics.incrementRotationCount();
+ switch (reason) {
+ case TIME:
+ metrics.incrementTimeBasedRotationCount();
+ break;
+ case SIZE:
+ metrics.incrementSizeBasedRotationCount();
+ break;
+ case ERROR:
+ metrics.incrementErrorBasedRotationCount();
+ break;
+ }
+ } catch (IOException e) {
+ // If we fail to rotate the log, we increment the failure counter. If we
have exceeded
+ // the maximum number of retries, we close the log and throw the
exception. Otherwise
+ // we log a warning and continue.
+ metrics.incrementRotationFailureCount();
+ long numFailures = rotationFailures.getAndIncrement();
+ if (numFailures >= maxRotationRetries) {
+ LOG.warn("Failed to rotate log (attempt {}/{}), closing log",
numFailures,
+ maxRotationRetries, e);
+ closeOnError();
+ throw e;
+ }
+ LOG.warn("Failed to rotate log (attempt {}/{}), retrying...",
numFailures, maxRotationRetries,
+ e);
+ } finally {
+ lock.unlock();
+ }
+ return currentWriter;
+ }
+
+ /**
+ * Creates a new log file path in a sharded directory structure based on
server name and
+ * timestamp.
+ */
+ protected Path makeWriterPath(FileSystem fs, URI url) throws IOException {
+ long timestamp = EnvironmentEdgeManager.currentTimeMillis();
+ // To have all logs for a given regionserver appear in the same shard,
hash only the
+ // serverName. However we expect some regionservers will have
significantly more load than
+ // others so we instead distribute the logs over all of the shards
randomly for a more even
+ // overall distribution by also hashing the timestamp.
+ int shard = (serverName.hashCode() ^ Long.hashCode(timestamp)) % numShards;
+ Path shardPath = new Path(url.getPath(), String.format(SHARD_DIR_FORMAT,
shard));
+ // Ensure the shard directory exists. We track which shard directories we
have probed or
+ // created to avoid a round trip to the namenode for repeats.
+ IOException[] exception = new IOException[1];
+ shardMap.computeIfAbsent(shardPath, p -> {
+ try {
+ if (!fs.exists(p)) {
+ if (!fs.mkdirs(p)) {
+ throw new IOException("Could not create path: " + p);
+ }
+ }
+ } catch (IOException e) {
+ exception[0] = e;
+ }
+ return p;
+ });
+ // If we faced an exception in computeIfAbsent, throw it
+ if (exception[0] != null) {
+ throw exception[0];
+ }
+ Path filePath = new Path(shardPath, String.format(FILE_NAME_FORMAT,
timestamp, serverName));
+ return filePath;
+ }
+
+ /** Creates and initializes a new LogFileWriter for the given filesystem and
URL. */
+ protected LogFileWriter createNewWriter(FileSystem fs, URI url) throws
IOException {
+ Path filePath = makeWriterPath(fs, url);
+ LogFileWriterContext writerContext = new
LogFileWriterContext(conf).setFileSystem(fs)
+ .setFilePath(filePath).setCompression(compression);
+ LogFileWriter newWriter = new LogFileWriter();
+ try {
+ newWriter.init(writerContext);
+ newWriter.setGeneration(writerGeneration.incrementAndGet());
+ } catch (IOException e) {
+ LOG.error("Failed to initialize new LogFileWriter for path {}",
filePath, e);
+ throw e;
+ }
+ return newWriter;
+ }
+
+ /** Closes the given writer, logging any errors that occur during close. */
+ protected void closeWriter(LogFileWriter writer) {
+ if (writer == null) {
+ return;
+ }
+ try {
+ writer.close();
+ } catch (IOException e) {
+ // For now, just log and continue
+ LOG.error("Error closing log writer: " + writer, e);
+ }
+ }
+
+ /**
+ * Force closes the log upon an unrecoverable internal error. This is a
fail-stop behavior: once
+ * called, the log is marked as closed, the Disruptor is halted, and all
subsequent append() and
+ * sync() calls will throw an IOException("Closed"). This ensures that no
further operations are
+ * attempted on a log that has encountered a critical error.
+ */
+ protected void closeOnError() {
+ lock.lock();
+ try {
+ if (isClosed) {
+ return;
+ }
+ isClosed = true;
+ } finally {
+ lock.unlock();
+ }
+ // Stop the time based rotation check.
+ stopRotationExecutor();
+ // We expect a final sync will not work. Just close the inner writer.
+ closeWriter(currentWriter);
+ // Directly halt the disruptor. shutdown() would wait for events to drain.
We are expecting
+ // that will not work.
+ disruptor.halt();
+ }
+
+ /** Closes the log. */
+ public void close() {
+ lock.lock();
+ try {
+ if (isClosed) {
+ return;
+ }
+ isClosed = true;
+ } finally {
+ lock.unlock();
+ }
+ // Stop the time based rotation check.
+ stopRotationExecutor();
+ // Sync before shutting down to flush all pending appends.
+ try {
+ syncInternal();
+ disruptor.shutdown(); // Wait for a clean shutdown.
+ } catch (IOException e) {
+ LOG.warn("Error during final sync on close", e);
+ disruptor.halt(); // Go directly to halt.
+ }
+ // We must for the disruptor before closing the current writer.
+ closeWriter(currentWriter);
+ }
+
+ /** Implements time based rotation independent of in-line checking. */
+ protected class LogRotationTask implements Runnable {
+ @Override
+ public void run() {
+ if (isClosed) {
+ return;
+ }
+ // Use tryLock with a timeout to avoid blocking indefinitely if another
thread holds
+ // the lock for an unexpectedly long time (e.g., during a problematic
rotation).
+ boolean acquired = false;
+ try {
+ // Wait a short time for the lock
+ acquired = lock.tryLock(1, TimeUnit.SECONDS);
+ if (acquired) {
+ // Check only the time condition here, size is handled by getWriter
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ long last = lastRotationTime.get();
+ if (!isClosed && now - last >= rotationTimeMs) {
+ LOG.debug("Time based rotation needed ({} ms elapsed, threshold {}
ms).", now - last,
+ rotationTimeMs);
+ try {
+ rotateLog(RotationReason.TIME); // rotateLog updates
lastRotationTime
+ } catch (IOException e) {
+ LOG.error("Failed to rotate log, currentWriter is {}",
currentWriter, e);
+ // More robust error handling goes here once the
store-and-forward
+ // fallback is implemented. For now we just log the error and
continue.
+ }
+ }
+ } else {
+ LOG.warn("LogRotationTask could not acquire lock, skipping check
this time.");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // Preserve interrupt status
+ LOG.warn("LogRotationTask interrupted while trying to acquire lock.");
+ } finally {
+ if (acquired) {
+ lock.unlock();
+ }
+ }
+ }
+ }
+
+ protected static class Record {
+ public String tableName;
+ public long commitId;
+ public Mutation mutation;
+
+ public Record(String tableName, long commitId, Mutation mutation) {
+ this.tableName = tableName;
+ this.commitId = commitId;
+ this.mutation = mutation;
+ }
+ }
+
+ /** Event structure for the Disruptor ring buffer containing data and sync
operations. */
+ protected static class LogEvent {
+ protected static final EventFactory<LogEvent> EVENT_FACTORY =
LogEvent::new;
+
+ protected int type;
+ protected Record record;
+ protected CompletableFuture<Void> syncFuture; // Used only for SYNC events
+ protected long timestampNs; // Timestamp when event was created
+
+ public void setValues(int type, Record record, CompletableFuture<Void>
syncFuture) {
+ this.type = type;
+ this.record = record;
+ this.syncFuture = syncFuture;
+ this.timestampNs = System.nanoTime();
+ }
+ }
+
+ /**
+ * Handles events from the Disruptor, managing batching, writer rotation,
and error handling.
+ */
+ protected class LogEventHandler implements EventHandler<LogEvent> {
+ protected final int maxRetries; // Configurable max retries for sync
+ protected final long retryDelayMs; // Configurable delay between retries
+ protected final List<Record> currentBatch = new ArrayList<>();
+ protected final List<CompletableFuture<Void>> pendingSyncFutures = new
ArrayList<>();
+ protected LogFileWriter writer;
+ protected long generation;
+
+ protected LogEventHandler() {
+ this.maxRetries =
+ conf.getInt(REPLICATION_LOG_SYNC_RETRIES_KEY,
DEFAULT_REPLICATION_LOG_SYNC_RETRIES);
+ this.retryDelayMs =
+ conf.getLong(REPLICATION_LOG_RETRY_DELAY_MS_KEY,
DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
+ }
+
+ protected void init() throws IOException {
+ this.writer = getWriter();
+ this.generation = writer.getGeneration();
+ }
+
+ /**
+ * Processes all pending sync operations by syncing the current writer and
completing their
+ * associated futures. This method is called when we are ready to process
a set of consolidated
+ * sync requests and performs the following steps:
+ * <ol>
+ * <li>Syncs the current writer to ensure all data is durably written.</li>
+ * <li>Completes all pending sync futures successfully.</li>
+ * <li>Clears the list of pending sync futures.</li>
+ * <li>Clears the current batch of records since they have been
successfully synced.</li>
+ * </ol>
+ * @param sequence The sequence number of the last processed event
+ * @throws IOException if the sync operation fails
+ */
+ protected void processPendingSyncs(long sequence) throws IOException {
+ if (pendingSyncFutures.isEmpty()) {
+ return;
+ }
+ writer.sync();
+ // Complete all pending sync futures
+ for (CompletableFuture<Void> future : pendingSyncFutures) {
+ future.complete(null);
+ }
+ pendingSyncFutures.clear();
+ // Sync completed, clear the list of in-flight appends.
+ currentBatch.clear();
+ LOG.trace("Sync operation completed successfully up to sequence {}",
sequence);
+ }
+
+ /**
+ * Fails all pending sync operations with the given exception. This method
is called when we
+ * encounter an unrecoverable error during the sync of the inner writer.
It completes all
+ * pending sync futures that were consolidated exceptionally.
+ * <p>
+ * Note: This method does not clear the currentBatch list. The
currentBatch must be preserved as
+ * it contains records that may need to be replayed if we successfully
rotate to a new writer.
+ * @param sequence The sequence number of the last processed event
+ * @param e The IOException that caused the failure
+ */
+ protected void failPendingSyncs(long sequence, IOException e) {
+ if (pendingSyncFutures.isEmpty()) {
+ return;
+ }
+ for (CompletableFuture<Void> future : pendingSyncFutures) {
+ future.completeExceptionally(e);
+ }
+ pendingSyncFutures.clear();
+ LOG.warn("Failed to process syncs at sequence {}", sequence, e);
+ }
+
+ /**
+ * Processes a single event from the Disruptor ring buffer. This method
handles both data and
+ * sync events, with retry logic for handling IO failures.
+ * <p>
+ * For data events, it:
+ * <ol>
+ * <li>Checks if the writer has been rotated and replays any in-flight
records.</li>
+ * <li>Appends the record to the current writer.</li>
+ * <li>Adds the record to the current batch for potential replay.</li>
+ * <li>Processes any pending syncs if this is the end of a batch.</li>
+ * </ol>
+ * <p>
+ * For sync events, it:
+ * <ol>
+ * <li>Adds the sync future to the pending list.</li>
+ * <li>Processes any pending syncs if this is the end of a batch.</li>
+ * </ol>
+ * If an IOException occurs, the method will attempt to rotate the writer
and retry the
+ * operation up to the configured maximum number of retries. If all
retries fail, it will fail
+ * all pending syncs and throw the exception.
+ * <p>
+ * The retry logic includes a configurable delay between attempts to
prevent tight loops when
+ * there are persistent HDFS issues. This delay helps mitigate the risk of
rapid cycling through
+ * writers when the underlying storage system is experiencing problems.
+ * @param event The event to process
+ * @param sequence The sequence number of the event
+ * @param endOfBatch Whether this is the last event in the current batch
+ * @throws Exception if the operation fails after all retries
+ */
+ @Override
+ public void onEvent(LogEvent event, long sequence, boolean endOfBatch)
throws Exception {
+ // Calculate time spent in ring buffer
+ long currentTimeNs = System.nanoTime();
+ long ringBufferTimeNs = currentTimeNs - event.timestampNs;
+ metrics.updateRingBufferTime(ringBufferTimeNs);
+ writer = getWriter();
+ int attempt = 0;
+ while (attempt < maxRetries) {
+ try {
+ if (writer.getGeneration() > generation) {
+ generation = writer.getGeneration();
+ // If the writer has been rotated, we need to replay the current
batch of
+ // in-flight appends into the new writer.
+ if (!currentBatch.isEmpty()) {
+ LOG.trace("Writer has been rotated, replaying in-flight batch");
+ for (Record r : currentBatch) {
+ writer.append(r.tableName, r.commitId, r.mutation);
+ }
+ }
+ }
+ switch (event.type) {
+ case EVENT_TYPE_DATA:
+ writer.append(event.record.tableName, event.record.commitId,
event.record.mutation);
+ // Add to current batch only after we succeed at appending, so
we don't
+ // replay it twice.
+ currentBatch.add(event.record);
+ // Process any pending syncs at the end of batch.
+ if (endOfBatch) {
+ processPendingSyncs(sequence);
+ }
+ return;
+ case EVENT_TYPE_SYNC:
+ // Add this sync future to the pending list.
+ pendingSyncFutures.add(event.syncFuture);
+ // Process any pending syncs at the end of batch.
+ if (endOfBatch) {
+ processPendingSyncs(sequence);
+ }
+ return;
+ default:
+ throw new UnsupportedOperationException("Unknown event type: " +
event.type);
+ }
+ } catch (IOException e) {
+ // IO exception, force a rotation.
+ LOG.debug("Attempt " + (attempt + 1) + "/" + maxRetries + " failed",
e);
+ if (attempt >= maxRetries) {
+ failPendingSyncs(sequence, e);
+ throw e;
+ }
+ attempt++;
+ // Add delay before retrying to prevent tight loops
+ try {
+ Thread.sleep(retryDelayMs);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Interrupted during retry delay");
+ }
+ writer = rotateLog(RotationReason.ERROR);
+ }
+ }
+ }
+ }
+
+ /**
+ * Handler for critical errors during the Disruptor lifecycle that closes
the writer to prevent
+ * data loss.
+ */
+ protected class LogExceptionHandler implements ExceptionHandler<LogEvent> {
+ @Override
+ public void handleEventException(Throwable e, long sequence, LogEvent
event) {
+ String message = "Exception processing sequence " + sequence + " for
event " + event;
+ LOG.error(message, e);
+ closeOnError();
+ }
+
+ @Override
+ public void handleOnStartException(Throwable e) {
+ LOG.error("Exception during Disruptor startup", e);
+ closeOnError();
+ }
+
+ @Override
+ public void handleOnShutdownException(Throwable e) {
+ // Should not happen, but if it does, the regionserver is aborting or
shutting down.
+ LOG.error("Exception during Disruptor shutdown", e);
+ closeOnError();
+ }
+ }
+
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
index 8560dc6ea3..0c883a4469 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/log/LogFileWriter.java
@@ -36,6 +36,13 @@ public class LogFileWriter implements LogFile.Writer {
private LogFileWriterContext context;
private LogFileFormatWriter writer;
private boolean closed = false;
+ /**
+ * A monotonically increasing sequence number that identifies this writer
instance, used to detect
+ * log file rotations and ensure proper handling of in-flight operations.
Higher layers will get a
+ * new generation number that is higher than any previous generation and
store it in the
+ * LogFileWriter via setGeneration().
+ */
+ private long generation;
public LogFileWriter() {
@@ -45,6 +52,14 @@ public class LogFileWriter implements LogFile.Writer {
return context;
}
+ public void setGeneration(long generation) {
+ this.generation = generation;
+ }
+
+ public long getGeneration() {
+ return generation;
+ }
+
@Override
public void init(LogFileWriterContext context) throws IOException {
this.context = context;
@@ -110,7 +125,7 @@ public class LogFileWriter implements LogFile.Writer {
@Override
public String toString() {
return "LogFileWriter [writerContext=" + context + ", formatWriter=" +
writer + ", closed="
- + closed + "]";
+ + closed + ", generation=" + generation + "]";
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
new file mode 100644
index 0000000000..907e3398fe
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+/** Interface for metrics related to ReplicationLog operations. */
+public interface MetricsReplicationLogSource extends BaseSource {
+
+ String METRICS_NAME = "ReplicationLog";
+ String METRICS_CONTEXT = "phoenix";
+ String METRICS_DESCRIPTION = "Metrics about Phoenix Replication Log
Operations";
+ String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+ String TIME_BASED_ROTATION_COUNT = "timeBasedRotationCount";
+ String TIME_BASED_ROTATION_COUNT_DESC = "Number of time-based log rotations";
+
+ String SIZE_BASED_ROTATION_COUNT = "sizeBasedRotationCount";
+ String SIZE_BASED_ROTATION_COUNT_DESC = "Number of size-based log rotations";
+
+ String ERROR_BASED_ROTATION_COUNT = "errorBasedRotationCount";
+ String ERROR_BASED_ROTATION_COUNT_DESC = "Number of times rotateLog was
called due to errors";
+
+ String ROTATION_COUNT = "rotationCount";
+ String ROTATION_COUNT_DESC = "Total number of times rotateLog was called";
+
+ String ROTATION_FAILURES = "rotationFailures";
+
+ String ROTATION_FAILURES_DESC = "Number of times log rotation has failed";
+ String APPEND_TIME = "appendTimeMs";
+ String APPEND_TIME_DESC = "Histogram of time taken for append operations in
nanoseconds";
+
+ String SYNC_TIME = "syncTimeMs";
+ String SYNC_TIME_DESC = "Histogram of time taken for sync operations in
nanoseconds";
+
+ String ROTATION_TIME = "rotationTimeMs";
+ String ROTATION_TIME_DESC = "Histogram of time taken for log rotations in
nanoseconds";
+
+ String RING_BUFFER_TIME = "ringBufferTime";
+ String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer";
+
+ /**
+ * Increments the counter for time-based log rotations. This counter tracks
the number of times
+ * the log was rotated due to time threshold.
+ */
+ void incrementTimeBasedRotationCount();
+
+ /**
+ * Increments the counter for size-based log rotations. This counter tracks
the number of times
+ * the log was rotated due to size threshold.
+ */
+ void incrementSizeBasedRotationCount();
+
+ /**
+ * Increments the counter for error-based log rotations. This counter tracks
the number of times
+ * the log was rotated due to errors.
+ */
+ void incrementErrorBasedRotationCount();
+
+ /**
+ * Increments the counter for total log rotations. This counter tracks the
total number of times
+ * the log was rotated, regardless of reason.
+ */
+ void incrementRotationCount();
+
+ /**
+ * Update the time taken for an append operation in nanoseconds.
+ * @param timeNs Time taken in nanoseconds
+ */
+ void updateAppendTime(long timeNs);
+
+ /**
+ * Update the time taken for a sync operation in nanoseconds.
+ * @param timeNs Time taken in nanoseconds
+ */
+ void updateSyncTime(long timeNs);
+
+ /**
+ * Update the time taken for a rotation operation in nanoseconds.
+ * @param timeNs Time taken in nanoseconds
+ */
+ void updateRotationTime(long timeNs);
+
+ /**
+ * Update the time an event spent in the ring buffer in nanoseconds.
+ * @param timeNs Time spent in nanoseconds
+ */
+ void updateRingBufferTime(long timeNs);
+
+ /**
+ * Increments the counter for log rotation failures. This counter tracks the
number of times log
+ * rotation has failed.
+ */
+ void incrementRotationFailureCount();
+
+ // Get current values for testing
+ ReplicationLogMetricValues getCurrentMetricValues();
+
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
new file mode 100644
index 0000000000..fcb08efde9
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.metrics;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableHistogram;
+
+/** Implementation of metrics source for ReplicationLog operations. */
+public class MetricsReplicationLogSourceImpl extends BaseSourceImpl
+ implements MetricsReplicationLogSource {
+
+ private final MutableFastCounter timeBasedRotationCount;
+ private final MutableFastCounter sizeBasedRotationCount;
+ private final MutableFastCounter errorBasedRotationCount;
+ private final MutableFastCounter rotationCount;
+ private final MutableFastCounter rotationFailuresCount;
+ private final MutableHistogram appendTime;
+ private final MutableHistogram syncTime;
+ private final MutableHistogram rotationTime;
+ private final MutableHistogram ringBufferTime;
+
+ public MetricsReplicationLogSourceImpl() {
+ this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT,
METRICS_JMX_CONTEXT);
+ }
+
+ public MetricsReplicationLogSourceImpl(String metricsName, String
metricsDescription,
+ String metricsContext, String metricsJmxContext) {
+ super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+ timeBasedRotationCount =
getMetricsRegistry().newCounter(TIME_BASED_ROTATION_COUNT,
+ TIME_BASED_ROTATION_COUNT_DESC, 0L);
+ sizeBasedRotationCount =
getMetricsRegistry().newCounter(SIZE_BASED_ROTATION_COUNT,
+ SIZE_BASED_ROTATION_COUNT_DESC, 0L);
+ errorBasedRotationCount =
getMetricsRegistry().newCounter(ERROR_BASED_ROTATION_COUNT,
+ ERROR_BASED_ROTATION_COUNT_DESC, 0L);
+ rotationCount = getMetricsRegistry().newCounter(ROTATION_COUNT,
ROTATION_COUNT_DESC, 0L);
+ rotationFailuresCount =
+ getMetricsRegistry().newCounter(ROTATION_FAILURES,
ROTATION_FAILURES_DESC, 0L);
+ appendTime = getMetricsRegistry().newHistogram(APPEND_TIME,
APPEND_TIME_DESC);
+ syncTime = getMetricsRegistry().newHistogram(SYNC_TIME, SYNC_TIME_DESC);
+ rotationTime = getMetricsRegistry().newHistogram(ROTATION_TIME,
ROTATION_TIME_DESC);
+ ringBufferTime = getMetricsRegistry().newHistogram(RING_BUFFER_TIME,
RING_BUFFER_TIME_DESC);
+ }
+
+ @Override
+ public void incrementTimeBasedRotationCount() {
+ timeBasedRotationCount.incr();
+ }
+
+ @Override
+ public void incrementSizeBasedRotationCount() {
+ sizeBasedRotationCount.incr();
+ }
+
+ @Override
+ public void incrementErrorBasedRotationCount() {
+ errorBasedRotationCount.incr();
+ }
+
+ @Override
+ public void incrementRotationCount() {
+ rotationCount.incr();
+ }
+
+ @Override
+ public void incrementRotationFailureCount() {
+ rotationFailuresCount.incr();
+ }
+
+ @Override
+ public void updateAppendTime(long timeNs) {
+ appendTime.add(timeNs);
+ }
+
+ @Override
+ public void updateSyncTime(long timeNs) {
+ syncTime.add(timeNs);
+ }
+
+ @Override
+ public void updateRotationTime(long timeNs) {
+ rotationTime.add(timeNs);
+ }
+
+ @Override
+ public void updateRingBufferTime(long timeNs) {
+ ringBufferTime.add(timeNs);
+ }
+
+ @Override
+ public ReplicationLogMetricValues getCurrentMetricValues() {
+ return new ReplicationLogMetricValues(timeBasedRotationCount.value(),
+ sizeBasedRotationCount.value(), errorBasedRotationCount.value(),
rotationCount.value(),
+ rotationFailuresCount.value(), appendTime.getMax(), syncTime.getMax(),
rotationTime.getMax(),
+ ringBufferTime.getMax());
+ }
+
+ @Override
+ public String getMetricsName() {
+ return METRICS_NAME;
+ }
+
+ @Override
+ public String getMetricsDescription() {
+ return METRICS_DESCRIPTION;
+ }
+
+ @Override
+ public String getMetricsContext() {
+ return METRICS_CONTEXT;
+ }
+
+ @Override
+ public String getMetricsJmxContext() {
+ return METRICS_JMX_CONTEXT;
+ }
+
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
new file mode 100644
index 0000000000..23ee864747
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.metrics;
+
+/** Class to hold the values of all metrics tracked by the ReplicationLog
metrics source. */
+public class ReplicationLogMetricValues {
+
+ private final long timeBasedRotationCount;
+ private final long sizeBasedRotationCount;
+ private final long errorBasedRotationCount;
+ private final long rotationCount;
+ private final long rotationFailuresCount;
+ private final long appendTime;
+ private final long syncTime;
+ private final long rotationTime;
+ private final long ringBufferTime;
+
+ public ReplicationLogMetricValues(long timeBasedRotationCount, long
sizeBasedRotationCount,
+ long errorBasedRotationCount, long rotationCount, long
rotationFailuresCount, long appendTime,
+ long syncTime, long rotationTime, long ringBufferTime) {
+ this.timeBasedRotationCount = timeBasedRotationCount;
+ this.sizeBasedRotationCount = sizeBasedRotationCount;
+ this.errorBasedRotationCount = errorBasedRotationCount;
+ this.rotationCount = rotationCount;
+ this.rotationFailuresCount = rotationFailuresCount;
+ this.appendTime = appendTime;
+ this.syncTime = syncTime;
+ this.rotationTime = rotationTime;
+ this.ringBufferTime = ringBufferTime;
+ }
+
+ public long getTimeBasedRotationCount() {
+ return timeBasedRotationCount;
+ }
+
+ public long getSizeBasedRotationCount() {
+ return sizeBasedRotationCount;
+ }
+
+ public long getErrorBasedRotationCount() {
+ return errorBasedRotationCount;
+ }
+
+ public long getRotationCount() {
+ return rotationCount;
+ }
+
+ public long getRotationFailuresCount() {
+ return rotationFailuresCount;
+ }
+
+ public long getAppendTime() {
+ return appendTime;
+ }
+
+ public long getSyncTime() {
+ return syncTime;
+ }
+
+ public long getRotationTime() {
+ return rotationTime;
+ }
+
+ public long getRingBufferTime() {
+ return ringBufferTime;
+ }
+
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/package-info.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/package-info.java
new file mode 100644
index 0000000000..209787baef
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides implementations for tracking and reporting various
metrics associated
+ * with the replication process.
+ */
+package org.apache.phoenix.replication.metrics;
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/package-info.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/package-info.java
new file mode 100644
index 0000000000..3fba1505e0
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes and utilities for handling replication,
including replication log
+ * management and system catalog WAL entry filtering.
+ */
+package org.apache.phoenix.replication;
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
new file mode 100644
index 0000000000..d5b1bb43d8
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.tool;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.replication.log.LogFile.Record;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Command-line tool for analyzing Phoenix Replication Log files. This tool
can: - Read a single log
+ * file or directory of log files - Print file headers, trailers, and block
headers - Decode and
+ * display log record contents - Verify checksums and report corruption
+ */
+public class LogFileAnalyzer extends Configured implements Tool {
+ private static final Logger LOG =
LoggerFactory.getLogger(LogFileAnalyzer.class);
+
+ private static final String USAGE = "Usage: LogFileAnalyzer [options]
<path>\n" + "Options:\n"
+ + " -h, --help Show this help message\n"
+ + " -v, --verbose Show detailed information\n"
+ + " -c, --check Verify checksums and report corruption\n"
+ + " -d, --decode Decode and display record contents\n";
+
+ private boolean verbose = false;
+ private boolean decode = false;
+ private boolean check = false;
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (!parseArgs(args)) {
+ System.err.println(USAGE);
+ return 1;
+ }
+
+ Configuration conf = getConf();
+ if (conf == null) {
+ conf = HBaseConfiguration.create();
+ setConf(conf);
+ }
+
+ try {
+ FileSystem fs = FileSystem.get(conf);
+ Path path = new Path(args[args.length - 1]);
+
+ if (!fs.exists(path)) {
+ System.err.println("Path does not exist: " + path);
+ return 1;
+ }
+
+ List<Path> filesToAnalyze = new ArrayList<>();
+ if (fs.getFileStatus(path).isDirectory()) {
+ // Recursively find all .plog files
+ findLogFiles(fs, path, filesToAnalyze);
+ } else {
+ filesToAnalyze.add(path);
+ }
+
+ if (filesToAnalyze.isEmpty()) {
+ System.err.println("No log files found in: " + path);
+ return 1;
+ }
+
+ // Analyze each file
+ for (Path file : filesToAnalyze) {
+ analyzeFile(fs, file);
+ }
+
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Error analyzing log files", e);
+ return 1;
+ }
+ }
+
+ private void findLogFiles(FileSystem fs, Path dir, List<Path> files) throws
IOException {
+ FileStatus[] statuses = fs.listStatus(dir);
+ for (FileStatus status : statuses) {
+ Path path = status.getPath();
+ if (status.isDirectory()) {
+ findLogFiles(fs, path, files);
+ } else if (path.getName().endsWith(".plog")) {
+ files.add(path);
+ }
+ }
+ }
+
+ private void analyzeFile(FileSystem fs, Path file) throws IOException {
+ System.out.println("\nAnalyzing file: " + file);
+
+ LogFileReaderContext context = new
LogFileReaderContext(getConf()).setFileSystem(fs)
+ .setFilePath(file).setSkipCorruptBlocks(check); // Skip corrupt blocks
if checking
+
+ LogFileReader reader = new LogFileReader();
+ try {
+ reader.init(context);
+
+ // Print header information
+ System.out.println("Header:");
+ System.out.println(" Version: " + reader.getHeader().getMajorVersion()
+ "."
+ + reader.getHeader().getMinorVersion());
+
+ // Process records
+ int recordCount = 0;
+ Record record;
+ while ((record = reader.next()) != null) {
+ recordCount++;
+ if (decode) {
+ System.out.println("\nRecord #" + recordCount + ":");
+ System.out.println(" Table: " + record.getHBaseTableName());
+ System.out.println(" Commit ID: " + record.getCommitId());
+ System.out.println(" Mutation: " + record.getMutation());
+ if (verbose) {
+ System.out.println(" Serialized Length: " +
record.getSerializedLength());
+ }
+ }
+ }
+
+ // Print trailer information
+ System.out.println("\nTrailer:");
+ System.out.println(" Record Count: " +
reader.getTrailer().getRecordCount());
+ System.out.println(" Block Count: " +
reader.getTrailer().getBlockCount());
+ System.out.println(" Blocks Start Offset: " +
reader.getTrailer().getBlocksStartOffset());
+ System.out.println(" Trailer Start Offset: " +
reader.getTrailer().getTrailerStartOffset());
+
+ // Print verification results if checking
+ if (check) {
+ System.out.println("\nVerification Results:");
+ System.out.println(" Records Read: " + context.getRecordsRead());
+ System.out.println(" Blocks Read: " + context.getBlocksRead());
+ System.out.println(" Corrupt Blocks Skipped: " +
context.getCorruptBlocksSkipped());
+
+ if (context.getCorruptBlocksSkipped() > 0) {
+ System.out.println(" WARNING: File contains corrupt blocks!");
+ } else if (context.getRecordsRead() ==
reader.getTrailer().getRecordCount()) {
+ System.out.println(" File integrity verified successfully");
+ } else {
+ System.out.println(" WARNING: Record count mismatch!");
+ }
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ private boolean parseArgs(String[] args) {
+ if (args.length == 0) {
+ return false;
+ }
+ for (int i = 0; i < args.length - 1; i++) {
+ String arg = args[i];
+ switch (arg) {
+ case "-h":
+ case "--help":
+ return false;
+ case "-v":
+ case "--verbose":
+ verbose = true;
+ break;
+ case "-c":
+ case "--check":
+ check = true;
+ break;
+ case "-d":
+ case "--decode":
+ decode = true;
+ break;
+ default:
+ if (arg.startsWith("-")) {
+ System.err.println("Unknown option: " + arg);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new LogFileAnalyzer(), args);
+ System.exit(res);
+ }
+
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/README.md
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/README.md
new file mode 100644
index 0000000000..c6afc361d9
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/README.md
@@ -0,0 +1,70 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Phoenix Replication Log File Analyzer
+
+A command-line tool for analyzing Phoenix Replication Log files.
+
+This tool can read a log file or a directory of log files, print the header,
+trailer (if present), and block headers, optionally decode and print the
+LogRecord contents in a human-readable format, verify checksums for each block,
+and report any corruption or format violations.
+
+## Usage
+
+```bash
+hadoop jar phoenix-server.jar \
+ org.apache.phoenix.replication.tool.LogFileAnalyzer [options] \
+ <log-file-or-directory>
+```
+
+### Options
+
+- `-h, --help`: Print help message
+- `-v, --verbose`: Print verbose output including block headers
+- `-c, --check`: Verify checksums and report any corruption
+- `-d, --decode`: Decode and print record contents in human-readable format
+
+### Examples
+
+#### Basic analysis of a log file:
+
+```bash
+hadoop jar phoenix-server.jar \
+ org.apache.phoenix.replication.tool.LogFileAnalyzer /path/to/log.plog
+```
+
+#### Analyze with record decoding:
+
+```bash
+hadoop jar phoenix-server.jar \
+ org.apache.phoenix.replication.tool.LogFileAnalyzer -d /path/to/log.plog
+```
+
+#### Verify checksums and report corruption:
+
+```bash
+hadoop jar phoenix-server.jar \
+ org.apache.phoenix.replication.tool.LogFileAnalyzer -c /path/to/log.plog
+```
+
+#### Analyze all log files in a directory with verbose output:
+
+```bash
+hadoop jar phoenix-server.jar \
+ org.apache.phoenix.replication.tool.LogFileAnalyzer -v /path/to/logs/
+```
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/package-info.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/package-info.java
new file mode 100644
index 0000000000..5c512daa76
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package containing tools for Phoenix replication functionality.
+ */
+package org.apache.phoenix.replication.tool;
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
new file mode 100644
index 0000000000..ced1527400
--- /dev/null
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
@@ -0,0 +1,1344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.phoenix.replication.ReplicationLog.RotationReason;
+import org.apache.phoenix.replication.log.LogFile;
+import org.apache.phoenix.replication.log.LogFileReader;
+import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.replication.log.LogFileTestUtil;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogSource;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogSourceImpl;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.InOrder;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplicationLogTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicationLogTest.class);
+
+ @ClassRule
+ public static TemporaryFolder testFolder = new TemporaryFolder();
+
+ private Configuration conf;
+ private ServerName serverName;
+ private FileSystem localFs;
+ private URI standbyUri;
+ private ReplicationLog logWriter;
+
+ static final int TEST_RINGBUFFER_SIZE = 32;
+ static final int TEST_SYNC_TIMEOUT = 1000;
+ static final int TEST_ROTATION_TIME = 5000;
+ static final int TEST_ROTATION_SIZE_BYTES = 10 * 1024;
+
+ @Before
+ public void setUp() throws IOException {
+ conf = HBaseConfiguration.create();
+ localFs = FileSystem.getLocal(conf);
+ standbyUri = new Path(testFolder.toString()).toUri();
+ serverName = ServerName.valueOf("test", 60010,
EnvironmentEdgeManager.currentTimeMillis());
+ conf.set(ReplicationLog.REPLICATION_STANDBY_HDFS_URL_KEY,
standbyUri.toString());
+ // Small ring buffer size for testing
+ conf.setInt(ReplicationLog.REPLICATION_LOG_RINGBUFFER_SIZE_KEY,
TEST_RINGBUFFER_SIZE);
+ // Set a short sync timeout for testing
+ conf.setLong(ReplicationLog.REPLICATION_LOG_SYNC_TIMEOUT_KEY,
TEST_SYNC_TIMEOUT);
+ // Set rotation time to 10 seconds
+ conf.setLong(ReplicationLog.REPLICATION_LOG_ROTATION_TIME_MS_KEY,
TEST_ROTATION_TIME);
+ // Small size threshold for testing
+ conf.setLong(ReplicationLog.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
TEST_ROTATION_SIZE_BYTES);
+
+ logWriter = spy(new TestableReplicationLog(conf, serverName));
+ logWriter.init();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (logWriter != null) {
+ logWriter.close();
+ }
+ // Deregister the metrics source that the replication log registers during
initialization
+ // so the next unit will be able to register it again and successfully
initialize.
+ DefaultMetricsSystem.instance()
+ .unregisterSource(MetricsReplicationLogSource.METRICS_JMX_CONTEXT);
+ }
+
+ /**
+ * Tests basic append and sync functionality of the replication log.
Verifies that mutations are
+ * correctly appended to the log and that sync operations properly commit
the changes to disk.
+ */
+ @Test
+ public void testAppendAndSync() throws Exception {
+ final String tableName = "TESTTBL";
+ final long commitId1 = 1L;
+ final long commitId2 = 2L;
+ final long commitId3 = 3L;
+ final long commitId4 = 4L;
+ final long commitId5 = 5L;
+ final Mutation put1 = LogFileTestUtil.newPut("row1", 1, 1);
+ final Mutation put2 = LogFileTestUtil.newPut("row2", 2, 1);
+ final Mutation put3 = LogFileTestUtil.newPut("row3", 3, 1);
+ final Mutation put4 = LogFileTestUtil.newPut("row4", 4, 1);
+ final Mutation put5 = LogFileTestUtil.newPut("row5", 5, 1);
+
+ // Get the inner writer
+ LogFileWriter writer = logWriter.getWriter();
+ assertNotNull("Writer should not be null", writer);
+ InOrder inOrder = Mockito.inOrder(writer);
+
+ logWriter.append(tableName, commitId1, put1);
+ logWriter.append(tableName, commitId2, put2);
+ logWriter.append(tableName, commitId3, put3);
+ logWriter.append(tableName, commitId4, put4);
+ logWriter.append(tableName, commitId5, put5);
+
+ logWriter.sync();
+
+ // Happens-before ordering verification, using Mockito's inOrder. Verify
that the appends
+ // happen before sync, and sync happened after appends.
+ inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId1),
eq(put1));
+ inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId2),
eq(put2));
+ inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId3),
eq(put3));
+ inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId4),
eq(put4));
+ inOrder.verify(writer, times(1)).append(eq(tableName), eq(commitId5),
eq(put5));
+ inOrder.verify(writer, times(1)).sync();
+ }
+
+ /**
+ * Tests the behavior when an append operation fails. Verifies that the
system properly handles
+ * append failures by rolling to a new writer and retrying the operation.
+ */
+ @Test
+ public void testAppendFailureAndRetry() throws Exception {
+ final String tableName = "TBLAFR";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Get the inner writer
+ LogFileWriter writerBeforeRoll = logWriter.getWriter();
+ assertNotNull("Initial writer should not be null", writerBeforeRoll);
+
+ // Configure writerBeforeRoll to fail on the first append call
+ doThrow(new IOException("Simulated append
failure")).when(writerBeforeRoll).append(anyString(),
+ anyLong(), any(Mutation.class));
+
+ // Append data
+ logWriter.append(tableName, commitId, put);
+ logWriter.sync();
+
+ // Get the inner writer we rolled to.
+ LogFileWriter writerAfterRoll = logWriter.getWriter();
+ assertNotNull("Rolled writer should not be null", writerAfterRoll);
+
+ // Verify the sequence: append (fail), rotate, append (succeed), sync
+ InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
+ inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName),
eq(commitId), eq(put));
+ inOrder.verify(writerBeforeRoll, times(0)).sync(); // We failed append,
did not try
+ inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName),
eq(commitId), eq(put)); // Retry
+ inOrder.verify(writerAfterRoll, times(1)).sync();
+ }
+
+ /**
+ * Tests the behavior when a sync operation fails. Verifies that the system
properly handles sync
+ * failures by rolling to a new writer and retrying the operation.
+ */
+ @Test
+ public void testSyncFailureAndRetry() throws Exception {
+ final String tableName = "TBLSFR";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Get the inner writer
+ LogFileWriter writerBeforeRoll = logWriter.getWriter();
+ assertNotNull("Initial writer should not be null", writerBeforeRoll);
+
+ // Configure writerBeforeRoll to fail on the first sync call
+ doThrow(new IOException("Simulated sync
failure")).when(writerBeforeRoll).sync();
+
+ // Append data
+ logWriter.append(tableName, commitId, put);
+ logWriter.sync();
+
+ // Get the inner writer we rolled to.
+ LogFileWriter writerAfterRoll = logWriter.getWriter();
+ assertNotNull("Initial writer should not be null", writerBeforeRoll);
+
+ // Verify the sequence: append, sync (fail), rotate, append (retry), sync
(succeed)
+ InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
+ inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName),
eq(commitId), eq(put));
+ inOrder.verify(writerBeforeRoll, times(1)).sync(); // Failed
+ inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName),
eq(commitId), eq(put)); // Replay
+ inOrder.verify(writerAfterRoll, times(1)).sync(); // Succeeded
+ }
+
+ /**
+ * Tests the blocking behavior when the ring buffer is full. Verifies that
append operations block
+ * when the ring buffer is full and resume as soon as space becomes
available again.
+ */
+ @Test
+ public void testBlockingWhenRingFull() throws Exception {
+ final String tableName = "TBLBWRF";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ long commitId = 0;
+
+ // Get the inner writer
+ LogFileWriter innerWriter = logWriter.getWriter();
+ assertNotNull("Inner writer should not be null", innerWriter);
+
+ // Create a slow consumer to fill up the ring buffer.
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(50); // Simulate slow processing
+ return invocation.callRealMethod();
+ }
+ }).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class));
+
+ // Fill up the ring buffer by sending enough events.
+ for (int i = 0; i < TEST_RINGBUFFER_SIZE; i++) {
+ logWriter.append(tableName, commitId++, put);
+ }
+
+ // Now try to append when the ring is full. This should block until space
becomes
+ // available.
+ long myCommitId = commitId++;
+ CompletableFuture<Void> startFuture = new CompletableFuture<>();
+ CompletableFuture<Void> appendFuture = new CompletableFuture<>();
+ Thread appendThread = new Thread(() -> {
+ try {
+ startFuture.complete(null);
+ logWriter.append(tableName, myCommitId, put);
+ appendFuture.complete(null);
+ } catch (IOException e) {
+ appendFuture.completeExceptionally(e);
+ }
+ });
+ appendThread.start();
+
+ // Wait for the append thread.
+ startFuture.get();
+
+ // Verify the append is still blocked
+ assertFalse("Append should be blocked when ring is full",
appendFuture.isDone());
+
+ // Let some events process to free up space.
+ Thread.sleep(100);
+
+ // Now the append should complete. Any issues and we will time out here.
+ appendFuture.get();
+ assertTrue("Append should have completed", appendFuture.isDone());
+
+ // Verify the append eventually happens on the writer.
+ verify(innerWriter, timeout(10000).times(1)).append(eq(tableName),
eq(myCommitId), any());
+ }
+
+ /**
+ * Tests the sync timeout behavior. Verifies that sync operations time out
after the configured
+ * interval if they cannot complete.
+ */
+ @Test
+ public void testSyncTimeout() throws Exception {
+ final String tableName = "TBLST";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Get the inner writer
+ LogFileWriter innerWriter = logWriter.getWriter();
+ assertNotNull("Inner writer should not be null", innerWriter);
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ // Pause long enough to cause a timeout.
+ Thread.sleep((long) (TEST_SYNC_TIMEOUT * 1.25));
+ return invocation.callRealMethod();
+ }
+ }).when(innerWriter).sync();
+
+ // Append some data
+ logWriter.append(tableName, commitId, put);
+
+ // Try to sync and expect it to timeout
+ try {
+ logWriter.sync();
+ fail("Expected sync to timeout");
+ } catch (IOException e) {
+ assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
+ }
+ }
+
+ /**
+ * Tests concurrent append operations from multiple producers. Verifies that
the system correctly
+ * handles concurrent appends from multiple threads and maintains data
consistency.
+ */
+ @Test
+ public void testConcurrentProducers() throws Exception {
+ final String tableName = "TBLCP";
+ final int APPENDS_PER_THREAD = 1000;
+ // Create a latch to coordinate thread starts
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ // Create a latch to track completion of all appends
+ final CountDownLatch completionLatch = new CountDownLatch(2);
+
+ // Get the inner writer
+ LogFileWriter innerWriter = logWriter.getWriter();
+ assertNotNull("Inner writer should not be null", innerWriter);
+
+ // Thread 1: Append mutations with even commit IDs
+ Thread producerEven = new Thread(() -> {
+ try {
+ startLatch.await(); // Wait for start signal
+ for (int i = 0; i < APPENDS_PER_THREAD; i++) {
+ final long commitId = i * 2;
+ final Mutation put = LogFileTestUtil.newPut("row" + commitId,
commitId, 1);
+ logWriter.append(tableName, commitId, put);
+ }
+ } catch (Exception e) {
+ fail("Producer 1 failed: " + e.getMessage());
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ // Thread 2: Append mutations with odd commit IDs
+ Thread producerOdd = new Thread(() -> {
+ try {
+ startLatch.await(); // Wait for start signal
+ for (int i = 0; i < APPENDS_PER_THREAD; i++) {
+ final long commitId = i * 2 + 1;
+ final Mutation put = LogFileTestUtil.newPut("row" + commitId,
commitId, 1);
+ logWriter.append(tableName, commitId, put);
+ }
+ } catch (Exception e) {
+ fail("Producer 2 failed: " + e.getMessage());
+ } finally {
+ completionLatch.countDown();
+ }
+ });
+
+ // Start both threads.
+ producerEven.start();
+ producerOdd.start();
+ // Signal threads to start.
+ startLatch.countDown();
+ // Wait for all appends to complete
+ completionLatch.await();
+
+ // Perform a sync to ensure all appends are processed.
+ InOrder inOrder = Mockito.inOrder(innerWriter); // To verify the below
sync.
+ logWriter.sync();
+ // Verify the final sync was called.
+ inOrder.verify(innerWriter, times(1)).sync();
+
+ // Verify that all of appends were processed by the internal writer.
+ for (int i = 0; i < APPENDS_PER_THREAD * 2; i++) {
+ final long commitId = i;
+ verify(innerWriter, times(1)).append(eq(tableName), eq(commitId), any());
+ }
+
+ }
+
+ /**
+ * Tests time-based log rotation. Verifies that the log file is rotated
after the configured
+ * rotation time period and that operations continue correctly with the new
log file.
+ */
+ @Test
+ public void testTimeBasedRotation() throws Exception {
+ final String tableName = "TBLTBR";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ final long commitId = 1L;
+
+ // Get the initial writer
+ LogFileWriter writerBeforeRotation = logWriter.getWriter();
+ assertNotNull("Initial writer should not be null", writerBeforeRotation);
+
+ // Append some data
+ logWriter.append(tableName, commitId, put);
+ logWriter.sync();
+
+ // Wait for rotation time to elapse
+ Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
+
+ // Append more data to trigger rotation check
+ logWriter.append(tableName, commitId + 1, put);
+ logWriter.sync();
+
+ // Get the new writer after rotation
+ LogFileWriter writerAfterRotation = logWriter.getWriter();
+ assertNotNull("New writer should not be null", writerAfterRotation);
+ assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
+
+ // Verify the sequence of operations
+ InOrder inOrder = Mockito.inOrder(writerBeforeRotation,
writerAfterRotation);
+ inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName),
eq(commitId), eq(put)); // First
+
// append
+
// to
+
// initial
+
// writer
+ inOrder.verify(writerBeforeRotation, times(1)).sync();
+ inOrder.verify(writerAfterRotation, times(0)).append(eq(tableName),
eq(commitId), eq(put)); // First
+
// append
+
// is
+
// not
+
// replayed
+ inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId + 1), eq(put)); // Second
+
// append
+
// to
+
// new
+
// writer
+ inOrder.verify(writerAfterRotation, times(1)).sync();
+ }
+
+ /**
+ * Tests size-based log rotation. Verifies that the log file is rotated when
it exceeds the
+ * configured size threshold and that operations continue correctly with the
new log file.
+ */
+ @Test
+ public void testSizeBasedRotation() throws Exception {
+ final String tableName = "TBLSBR";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 10);
+ long commitId = 1L;
+
+ LogFileWriter writerBeforeRotation = logWriter.getWriter();
+ assertNotNull("Initial writer should not be null", writerBeforeRotation);
+
+ // Append enough data so that we exceed the size threshold.
+ for (int i = 0; i < 100; i++) {
+ logWriter.append(tableName, commitId++, put);
+ }
+ logWriter.sync(); // Should trigger a sized based rotation
+
+ // Get the new writer after the expected rotation.
+ LogFileWriter writerAfterRotation = logWriter.getWriter();
+ assertNotNull("New writer should not be null", writerAfterRotation);
+ assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
+
+ // Append one more mutation to verify we're using the new writer.
+ logWriter.append(tableName, commitId, put);
+ logWriter.sync();
+
+ // Verify the sequence of operations
+ InOrder inOrder = Mockito.inOrder(writerBeforeRotation,
writerAfterRotation);
+ // Verify all appends before rotation went to the first writer.
+ for (int i = 1; i < commitId; i++) {
+ inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName),
eq((long) i), eq(put));
+ }
+ inOrder.verify(writerBeforeRotation, times(1)).sync();
+ // Verify the final append went to the new writer.
+ inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId), eq(put));
+ inOrder.verify(writerAfterRotation, times(1)).sync();
+ }
+
+ /**
+ * Tests the close operation of the replication log. Verifies that the log
properly closes its
+ * resources and prevents further operations after being closed.
+ */
+ @Test
+ public void testClose() throws Exception {
+ final String tableName = "TBLCLOSE";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ final long commitId = 1L;
+
+ // Get the inner writer
+ LogFileWriter innerWriter = logWriter.getWriter();
+ assertNotNull("Inner writer should not be null", innerWriter);
+
+ // Append some data
+ logWriter.append(tableName, commitId, put);
+
+ // Close the log writer
+ logWriter.close();
+
+ // Verify the inner writer was closed
+ verify(innerWriter, times(1)).close();
+
+ // Verify we can't append after close
+ try {
+ logWriter.append(tableName, commitId + 1, put);
+ fail("Expected append to fail after close");
+ } catch (IOException e) {
+ // Expected
+ }
+
+ // Verify we can't sync after close
+ try {
+ logWriter.sync();
+ fail("Expected sync to fail after close");
+ } catch (IOException e) {
+ // Expected
+ }
+
+ // Verify we can close multiple times without error
+ logWriter.close();
+ }
+
+ /**
+ * Tests the automatic rotation task. Verifies that the background rotation
task correctly rotates
+ * log files based on the configured rotation time.
+ */
+ @Test
+ public void testRotationTask() throws Exception {
+ final String tableName = "TBLRT";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ long commitId = 1L;
+
+ LogFileWriter writerBeforeRotation = logWriter.getWriter();
+ assertNotNull("Initial writer should not be null", writerBeforeRotation);
+
+ // Append some data and wait for the rotation time to elapse plus a small
buffer.
+ logWriter.append(tableName, commitId, put);
+ logWriter.sync();
+ Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
+
+ // Get the new writer after the rotation.
+ LogFileWriter writerAfterRotation = logWriter.getWriter();
+ assertNotNull("New writer should not be null", writerAfterRotation);
+ assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
+
+ // Verify first append and sync went to initial writer
+ verify(writerBeforeRotation, times(1)).append(eq(tableName), eq(1L),
eq(put));
+ verify(writerBeforeRotation, times(1)).sync();
+ // Verify the initial writer was closed
+ verify(writerBeforeRotation, times(1)).close();
+ }
+
+ /**
+ * Tests behavior when log rotation fails temporarily but eventually
succeeds. Verifies that:
+ * <ul>
+ * <li>The system can handle temporary rotation failures</li>
+ * <li>After failing twice, the third rotation attempt succeeds</li>
+ * <li>Operations continue correctly with the new writer after successful
rotation</li>
+ * <li>The metrics for rotation failures are properly tracked</li>
+ * <li>Operations can continue with the current writer while rotation
attempts are failing</li>
+ * </ul>
+ * <p>
+ * This test simulates a scenario where the first two rotation attempts fail
(e.g., due to
+ * temporary HDFS issues) but the third attempt succeeds. This is a common
real-world scenario
+ * where transient failures occur but the system eventually recovers. During
the failed rotation
+ * attempts, the system should continue to operate normally with the current
writer.
+ */
+ @Test
+ public void testFailedRotation() throws Exception {
+ final String tableName = "TBLFR";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ long commitId = 1L;
+
+ // Get the initial writer
+ LogFileWriter initialWriter = logWriter.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // Configure the log writer to fail only the first time when creating new
writers.
+ AtomicBoolean shouldFail = new AtomicBoolean(true);
+ doAnswer(invocation -> {
+ if (shouldFail.getAndSet(false)) {
+ throw new IOException("Simulated failure to create new writer");
+ }
+ return invocation.callRealMethod();
+ }).when(logWriter).createNewWriter(any(FileSystem.class), any(URI.class));
+
+ // Append some data
+ logWriter.append(tableName, commitId, put);
+ logWriter.sync();
+
+ // Rotate the log.
+ LogFileWriter writerAfterFailedRotate =
logWriter.rotateLog(RotationReason.TIME);
+ assertEquals("Should still be using the initial writer", initialWriter,
+ writerAfterFailedRotate);
+
+ // While rotation is failing, verify we can continue to use the current
writer.
+ logWriter.append(tableName, commitId + 1, put);
+ logWriter.sync();
+
+ LogFileWriter writerAfterRotate = logWriter.rotateLog(RotationReason.TIME);
+ assertNotEquals("Should be using a new writer", initialWriter,
writerAfterRotate);
+
+ // Try to append more data. This should work with the new writer after
successful rotation.
+ logWriter.append(tableName, commitId + 2, put);
+ logWriter.sync();
+
+ // Verify operations went to the writers in the correct order
+ InOrder inOrder = Mockito.inOrder(initialWriter, writerAfterRotate);
+ // First append and sync on initial writer.
+ inOrder.verify(initialWriter).append(eq(tableName), eq(commitId), eq(put));
+ inOrder.verify(initialWriter).sync();
+ // Second append and sync on initial writer after failed rotation.
+ inOrder.verify(initialWriter).append(eq(tableName), eq(commitId + 1),
eq(put));
+ inOrder.verify(initialWriter).sync();
+ // Final append and sync on new writer after successful rotation.
+ inOrder.verify(writerAfterRotate).append(eq(tableName), eq(commitId + 2),
eq(put));
+ inOrder.verify(writerAfterRotate).sync();
+ }
+
+ /**
+ * This test simulates a scenario where rotation consistently fails and
verifies that the system
+ * properly propagates an exception after exhausting all retry attempts.
+ */
+ @Test
+ public void testTooManyRotationFailures() throws Exception {
+ final String tableName = "TBLTMRF";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ long commitId = 1L;
+
+ LogFileWriter initialWriter = logWriter.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // Configure the log writer to always fail when creating new writers
+ doThrow(new IOException("Simulated failure to create new
writer")).when(logWriter)
+ .createNewWriter(any(FileSystem.class), any(URI.class));
+
+ // Append some data
+ logWriter.append(tableName, commitId, put);
+ logWriter.sync();
+
+ // Try to rotate the log multiple times until we exceed the retry limit
+ for (int i = 0; i <=
ReplicationLog.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES; i++) {
+ try {
+ logWriter.rotateLog(RotationReason.TIME);
+ } catch (IOException e) {
+ if (i < ReplicationLog.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES) {
+ // Not the last attempt yet, continue
+ continue;
+ }
+ // This was the last attempt, verify the exception
+ assertTrue("Expected IOException", e instanceof IOException);
+ assertTrue("Expected our mocked failure cause",
+ e.getMessage().contains("Simulated failure"));
+
+ }
+ }
+
+ // Verify subsequent operations fail because the log is closed
+ try {
+ logWriter.append(tableName, commitId + 1, put);
+ logWriter.sync();
+ fail("Expected append to fail because log is closed");
+ } catch (IOException e) {
+ assertTrue("Expected an IOException because log is closed",
+ e.getMessage().contains("Closed"));
+ }
+ }
+
+ /**
+ * Tests handling of critical exceptions during event processing. Verifies
that the system
+ * properly handles critical errors by closing the log and preventing
further operations.
+ */
+ @Test
+ public void testEventProcessingException() throws Exception {
+ final String tableName = "TBLEPE";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Get the inner writer
+ LogFileWriter innerWriter = logWriter.getWriter();
+ assertNotNull("Writer should not be null", innerWriter);
+
+ // Configure writer to throw a RuntimeException on append
+ doThrow(new RuntimeException("Simulated critical
error")).when(innerWriter).append(anyString(),
+ anyLong(), any(Mutation.class));
+
+ // Append data. This should trigger the LogExceptionHandler, which will
close logWriter.
+ logWriter.append(tableName, commitId, put);
+ try {
+ logWriter.sync();
+ fail("Should have thrown IOException because sync timed out");
+ } catch (IOException e) {
+ assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
+ }
+
+ // Verify that subsequent operations fail because the log is closed
+ try {
+ logWriter.append(tableName, commitId + 1, put);
+ fail("Should have thrown IOException because log is closed");
+ } catch (IOException e) {
+ assertTrue("Expected an IOException because log is closed",
+ e.getMessage().contains("Closed"));
+ }
+
+ // Verify that the inner writer was closed by the LogExceptionHandler
+ verify(innerWriter, times(1)).close();
+ }
+
+ /**
+ * Tests behavior when all sync retry attempts are exhausted. Verifies that
the system properly
+ * handles the case where sync operations fail repeatedly and eventually
timeout.
+ */
+ @Test
+ public void testSyncFailureAllRetriesExhausted() throws Exception {
+ final String tableName = "TBLSAFR";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Get the initial writer
+ LogFileWriter initialWriter = logWriter.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // Configure initial writer to fail on sync
+ doThrow(new IOException("Simulated sync
failure")).when(initialWriter).sync();
+
+ // createNewWriter should keep returning the bad writer
+ doAnswer(invocation ->
initialWriter).when(logWriter).createNewWriter(any(FileSystem.class),
+ any(URI.class));
+
+ // Append data
+ logWriter.append(tableName, commitId, put);
+
+ // Try to sync. Should fail after exhausting retries.
+ try {
+ logWriter.sync();
+ fail("Expected sync to fail after exhausting retries");
+ } catch (IOException e) {
+ assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
+ }
+
+ // Each retry creates a new writer, so that is at least 1 create + 5
retries.
+ verify(logWriter, atLeast(6)).createNewWriter(any(FileSystem.class),
any(URI.class));
+ }
+
+ /**
+ * Tests log rotation behavior during batch operations. Verifies that the
system correctly handles
+ * rotation when there are pending batch operations, ensuring no data loss.
+ */
+ @Test
+ public void testRotationDuringBatch() throws Exception {
+ final String tableName = "TBLRDB";
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ long commitId = 1L;
+
+ // Get the initial writer
+ LogFileWriter writerBeforeRotation = logWriter.getWriter();
+ assertNotNull("Initial writer should not be null", writerBeforeRotation);
+
+ // Append several items to fill currentBatch but don't sync yet
+ for (int i = 0; i < 5; i++) {
+ logWriter.append(tableName, commitId + i, put);
+ }
+
+ // Force a rotation by waiting for rotation time to elapse
+ Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
+
+ // Get the new writer after rotation
+ LogFileWriter writerAfterRotation = logWriter.getWriter();
+ assertNotNull("New writer should not be null", writerAfterRotation);
+ assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
+
+ // Now trigger a sync which should replay the currentBatch to the new
writer
+ logWriter.sync();
+
+ // Verify the sequence of operations
+ InOrder inOrder = Mockito.inOrder(writerBeforeRotation,
writerAfterRotation);
+
+ // Verify all appends before rotation went to the first writer
+ for (int i = 0; i < 5; i++) {
+ inOrder.verify(writerBeforeRotation, times(1)).append(eq(tableName),
eq(commitId + i),
+ eq(put));
+ }
+
+ // Verify the currentBatch was replayed to the new writer
+ for (int i = 0; i < 5; i++) {
+ inOrder.verify(writerAfterRotation, times(1)).append(eq(tableName),
eq(commitId + i),
+ eq(put));
+ }
+
+ // Verify sync happened on the new writer
+ inOrder.verify(writerAfterRotation, times(1)).sync();
+
+ // Verify the initial writer was closed
+ verify(writerBeforeRotation, times(1)).close();
+ }
+
+ /**
+ * Tests reading records after writing them to the log. Verifies that
records written to the log
+ * can be correctly read back and match the original data.
+ */
+ @Test
+ public void testReadAfterWrite() throws Exception {
+ final String tableName = "TBLRAW";
+ final int NUM_RECORDS = 100;
+ List<LogFile.Record> originalRecords = new ArrayList<>();
+
+ // Get the path of the log file.
+ Path logPath = logWriter.getWriter().getContext().getFilePath();
+
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ LogFile.Record record = LogFileTestUtil.newPutRecord(tableName, i, "row"
+ i, i, 1);
+ originalRecords.add(record);
+ logWriter.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
+ }
+ logWriter.sync(); // Sync to commit the appends to the current writer.
+
+ // Force a rotation to close the current writer.
+ logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+
+ assertTrue("Log file should exist", localFs.exists(logPath));
+
+ // Read and verify all records
+ LogFileReader reader = new LogFileReader();
+ LogFileReaderContext readerContext =
+ new
LogFileReaderContext(conf).setFileSystem(localFs).setFilePath(logPath);
+ reader.init(readerContext);
+
+ List<LogFile.Record> readRecords = new ArrayList<>();
+ LogFile.Record record;
+ while ((record = reader.next()) != null) {
+ readRecords.add(record);
+ }
+
+ reader.close();
+
+ // Verify we have the expected number of records.
+ assertEquals("Number of records mismatch", NUM_RECORDS,
readRecords.size());
+
+ // Verify each record matches the original.
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ LogFileTestUtil.assertRecordEquals("Record mismatch at index " + i,
originalRecords.get(i),
+ readRecords.get(i));
+ }
+ }
+
+ /**
+ * Tests reading records after multiple log rotations. Verifies that records
can be correctly read
+ * across multiple log files after several rotations, maintaining data
consistency.
+ */
+ @Test
+ public void testReadAfterMultipleRotations() throws Exception {
+ final String tableName = "TBLRAMR";
+ final int NUM_RECORDS_PER_ROTATION = 100;
+ final int NUM_ROTATIONS = 10;
+ final int TOTAL_RECORDS = NUM_RECORDS_PER_ROTATION * NUM_ROTATIONS;
+ List<LogFile.Record> originalRecords = new ArrayList<>();
+ List<Path> logPaths = new ArrayList<>();
+
+ // Write records across multiple rotations.
+ for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
+ // Get the path of the current log file.
+ Path logPath = logWriter.getWriter().getContext().getFilePath();
+ logPaths.add(logPath);
+
+ for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
+ int commitId = (rotation * NUM_RECORDS_PER_ROTATION) + i;
+ LogFile.Record record =
+ LogFileTestUtil.newPutRecord(tableName, commitId, "row" + commitId,
commitId, 1);
+ originalRecords.add(record);
+ logWriter.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
+ }
+ logWriter.sync(); // Sync to commit the appends to the current writer.
+ // Force a rotation to close the current writer.
+ logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+ }
+
+ // Verify all log files exist
+ for (Path logPath : logPaths) {
+ assertTrue("Log file should exist: " + logPath, localFs.exists(logPath));
+ }
+
+ // Read and verify all records from each log file, in the order in which
the log files
+ // were written.
+ List<LogFile.Record> readRecords = new ArrayList<>();
+ for (Path logPath : logPaths) {
+ LogFileReader reader = new LogFileReader();
+ LogFileReaderContext readerContext =
+ new
LogFileReaderContext(conf).setFileSystem(localFs).setFilePath(logPath);
+ reader.init(readerContext);
+
+ LogFile.Record record;
+ while ((record = reader.next()) != null) {
+ readRecords.add(record);
+ }
+ reader.close();
+ }
+
+ // Verify we have the expected number of records.
+ assertEquals("Total number of records mismatch", TOTAL_RECORDS,
readRecords.size());
+
+ // Verify each record matches the original. This confirms the total
ordering of all records
+ // in all files.
+ for (int i = 0; i < TOTAL_RECORDS; i++) {
+ LogFileTestUtil.assertRecordEquals("Record mismatch at index " + i,
originalRecords.get(i),
+ readRecords.get(i));
+ }
+ }
+
+ /**
+ * Tests reading records after multiple rotations with intermittent syncs.
If we do not sync when
+ * we roll a file, the in-flight batch is replayed into the new writer when
we do finally sync
+ * (with the new writer). Verifies that records can be correctly read even
when syncs are not
+ * performed before each rotation, ensuring data consistency.
+ */
+ @Test
+ public void testReadAfterMultipleRotationsWithReplay() throws Exception {
+ final String tableName = "TBLRAMRIS";
+ final int NUM_RECORDS_PER_ROTATION = 100;
+ final int NUM_ROTATIONS = 10;
+ final int TOTAL_RECORDS = NUM_RECORDS_PER_ROTATION * NUM_ROTATIONS;
+ List<LogFile.Record> originalRecords = new ArrayList<>();
+ List<Path> logPaths = new ArrayList<>();
+
+ // Write records across multiple rotations, only syncing 50% of the time.
+ for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
+ // Get the path of the current log file.
+ Path logPath = logWriter.getWriter().getContext().getFilePath();
+ logPaths.add(logPath);
+
+ for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
+ int commitId = (rotation * NUM_RECORDS_PER_ROTATION) + i;
+ LogFile.Record record =
+ LogFileTestUtil.newPutRecord(tableName, commitId, "row" + commitId,
commitId, 1);
+ originalRecords.add(record);
+ logWriter.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
+ }
+
+ // Only sync 50% of the time before rotation. To ensure we sync on the
last file
+ // we are going to write, use 'rotation % 2 == 1' instead of 'rotation %
2 == 0'.
+ if (rotation % 2 == 1) {
+ logWriter.sync(); // Sync to commit the appends to the current writer.
+ }
+ // Force a rotation to close the current writer.
+ logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+ }
+
+ // Verify all log files exist
+ for (Path logPath : logPaths) {
+ assertTrue("Log file should exist: " + logPath, localFs.exists(logPath));
+ }
+
+ // Read and verify all records from each log file, tracking unique records
and duplicates.
+ Set<LogFile.Record> uniqueRecords = new HashSet<>();
+ List<LogFile.Record> allReadRecords = new ArrayList<>();
+
+ for (Path logPath : logPaths) {
+ LogFileReader reader = new LogFileReader();
+ LogFileReaderContext readerContext =
+ new
LogFileReaderContext(conf).setFileSystem(localFs).setFilePath(logPath);
+ reader.init(readerContext);
+ LogFile.Record record;
+ while ((record = reader.next()) != null) {
+ allReadRecords.add(record);
+ uniqueRecords.add(record);
+ }
+ reader.close();
+ }
+
+ // Print statistics about duplicates for informational purposes.
+ LOG.info("{} total records across all files", allReadRecords.size());
+ LOG.info("{} unique records", uniqueRecords.size());
+ LOG.info("{} duplicate records", allReadRecords.size() -
uniqueRecords.size());
+
+ // Verify we have all the expected unique records
+ assertEquals("Number of unique records mismatch", TOTAL_RECORDS,
uniqueRecords.size());
+ }
+
+ /**
+ * Tests behavior when a RuntimeException occurs during writer.getLength()
in shouldRotate().
+ * Verifies that the system properly handles critical errors by closing the
log and preventing
+ * further operations.
+ */
+ @Test
+ public void testRuntimeExceptionDuringLengthCheck() throws Exception {
+ final String tableName = "TBLRDL";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Get the inner writer
+ LogFileWriter innerWriter = logWriter.getWriter();
+ assertNotNull("Writer should not be null", innerWriter);
+
+ // Configure writer to throw RuntimeException on getLength()
+ doThrow(new RuntimeException("Simulated critical
error")).when(innerWriter).getLength();
+
+ // Append data. This should trigger the LogExceptionHandler, which will
close logWriter.
+ logWriter.append(tableName, commitId, put);
+ try {
+ logWriter.sync();
+ fail("Should have thrown IOException because sync timed out");
+ } catch (IOException e) {
+ assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
+ }
+
+ // Verify that subsequent operations fail because the log is closed
+ try {
+ logWriter.append(tableName, commitId + 1, put);
+ fail("Should have thrown IOException because log is closed");
+ } catch (IOException e) {
+ assertTrue("Expected an IOException because log is closed",
+ e.getMessage().contains("Closed"));
+ }
+
+ // Verify that the inner writer was closed by the LogExceptionHandler
+ verify(innerWriter, times(1)).close();
+ }
+
+ /**
+ * Tests behavior when a RuntimeException occurs during append() after
closeOnError() has been
+ * called. Verifies that the system properly rejects sync operations after
being closed.
+ */
+ @Test
+ public void testAppendAfterCloseOnError() throws Exception {
+ final String tableName = "TBLAAE";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Get the inner writer
+ LogFileWriter innerWriter = logWriter.getWriter();
+ assertNotNull("Writer should not be null", innerWriter);
+
+ // Configure writer to throw RuntimeException on append
+ doThrow(new RuntimeException("Simulated critical
error")).when(innerWriter).append(anyString(),
+ anyLong(), any(Mutation.class));
+
+ // Append data to trigger closeOnError()
+ logWriter.append(tableName, commitId, put);
+ try {
+ logWriter.sync();
+ fail("Should have thrown IOException because sync timed out");
+ } catch (IOException e) {
+ assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
+ }
+
+ // Verify that subsequent append operations fail because the log is closed
+ try {
+ logWriter.append(tableName, commitId, put);
+ fail("Should have thrown IOException because log is closed");
+ } catch (IOException e) {
+ assertTrue("Expected an IOException because log is closed",
+ e.getMessage().contains("Closed"));
+ }
+
+ // Verify that the inner writer was closed by the LogExceptionHandler
+ verify(innerWriter, times(1)).close();
+ }
+
+ /**
+ * Tests behavior when a RuntimeException occurs during sync() after
closeOnError() has been
+ * called. Verifies that the system properly rejects sync operations after
being closed.
+ */
+ @Test
+ public void testSyncAfterCloseOnError() throws Exception {
+ final String tableName = "TBLSAE";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Get the inner writer
+ LogFileWriter innerWriter = logWriter.getWriter();
+ assertNotNull("Writer should not be null", innerWriter);
+
+ // Configure writer to throw RuntimeException on append
+ doThrow(new RuntimeException("Simulated critical
error")).when(innerWriter).append(anyString(),
+ anyLong(), any(Mutation.class));
+
+ // Append data to trigger closeOnError()
+ logWriter.append(tableName, commitId, put);
+ try {
+ logWriter.sync();
+ fail("Should have thrown IOException because sync timed out");
+ } catch (IOException e) {
+ assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
+ }
+
+ // Verify that subsequent sync operations fail because the log is closed
+ try {
+ logWriter.sync();
+ fail("Should have thrown IOException because log is closed");
+ } catch (IOException e) {
+ assertTrue("Expected an IOException because log is closed",
+ e.getMessage().contains("Closed"));
+ }
+
+ // Verify that the inner writer was closed by the LogExceptionHandler
+ verify(innerWriter, times(1)).close();
+ }
+
+ /**
+ * Tests race condition between LogRotationTask and LogEventHandler when
both try to rotate the
+ * writer simultaneously. Verifies that despite concurrent rotation
attempts, the log is only
+ * rotated once. Uses latches to ensure true concurrency and verify the
sequence of operations.
+ */
+ @Test
+ public void testConcurrentRotationAttempts() throws Exception {
+ final String tableName = "TBLCR";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Get the initial writer
+ LogFileWriter initialWriter = logWriter.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // Create latches to control timing and track rotation attempts
+ final CountDownLatch rotationTaskStarted = new CountDownLatch(1);
+ final CountDownLatch rotationTaskCanProceed = new CountDownLatch(1);
+ final CountDownLatch eventHandlerStarted = new CountDownLatch(1);
+ final CountDownLatch eventHandlerCanProceed = new CountDownLatch(1);
+ final AtomicInteger rotationCount = new AtomicInteger(0);
+ final CountDownLatch bothAttemptsStarted = new CountDownLatch(2);
+
+ // Configure the rotation task to pause at specific points and track
attempts
+ doAnswer(invocation -> {
+ rotationTaskStarted.countDown(); // Signal that rotation task has started
+ bothAttemptsStarted.countDown(); // Signal that this attempt has started
+ rotationTaskCanProceed.await(); // Wait for permission to proceed
+ rotationCount.incrementAndGet(); // Track this rotation attempt
+ return invocation.callRealMethod();
+ }).when(logWriter).rotateLog(ReplicationLog.RotationReason.TIME);
+
+ // Configure the event handler to pause at specific points
+ doAnswer(invocation -> {
+ eventHandlerStarted.countDown(); // Signal that event handler has started
+ bothAttemptsStarted.countDown(); // Signal that this attempt has started
+ eventHandlerCanProceed.await(); // Wait for permission to proceed
+ return invocation.callRealMethod();
+ }).when(logWriter).getWriter();
+
+ // Start a thread that will trigger rotation via the background task
+ Thread rotationThread = new Thread(() -> {
+ try {
+ // Force rotation by waiting for rotation time
+ Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ rotationThread.start();
+
+ // Start append operation in main thread
+ logWriter.append(tableName, commitId, put);
+
+ // Wait for both attempts to start - this ensures true concurrency
+ assertTrue("Both rotation attempts should start",
+ bothAttemptsStarted.await(5, TimeUnit.SECONDS));
+
+ // Verify both attempts have started before proceeding
+ assertEquals("Both attempts should have started", 0,
bothAttemptsStarted.getCount());
+
+ // Allow both operations to proceed simultaneously
+ eventHandlerCanProceed.countDown();
+ rotationTaskCanProceed.countDown();
+
+ // Wait for both operations to complete
+ rotationThread.join();
+
+ // Verify the final state
+ LogFileWriter finalWriter = logWriter.getWriter();
+ assertNotNull("Final writer should not be null", finalWriter);
+ assertTrue("Writer should have been rotated", finalWriter !=
initialWriter);
+
+ // Verify only one rotation actually occurred
+ assertEquals("Should have only one actual rotation", 1,
rotationCount.get());
+
+ // Verify all operations completed successfully
+ logWriter.sync();
+
+ // Verify the sequence of operations through the latches
+ assertTrue("Rotation task should have started",
rotationTaskStarted.getCount() == 0);
+ assertTrue("Event handler should have started",
eventHandlerStarted.getCount() == 0);
+ }
+
+ /**
+ * Tests race condition between LogEventHandler retry loop and
LogRotationTask when both try to
+ * rotate the writer simultaneously. Verifies that despite concurrent
rotation attempts during a
+ * retry scenario, the log is only rotated once. Uses latches to ensure true
concurrency and
+ * verify the sequence of operations.
+ */
+ @Test
+ public void testConcurrentRotationDuringRetry() throws Exception {
+ final String tableName = "TBLCRR";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Get the initial writer
+ LogFileWriter initialWriter = logWriter.getWriter();
+ assertNotNull("Initial writer should not be null", initialWriter);
+
+ // Create latches to control timing and track rotation attempts
+ final CountDownLatch retryStarted = new CountDownLatch(1);
+ final CountDownLatch retryCanProceed = new CountDownLatch(1);
+ final CountDownLatch rotationTaskStarted = new CountDownLatch(1);
+ final CountDownLatch rotationTaskCanProceed = new CountDownLatch(1);
+ final AtomicInteger rotationCount = new AtomicInteger(0);
+ final CountDownLatch bothAttemptsStarted = new CountDownLatch(2);
+
+ // Configure the writer to fail on first append, succeed on retry
+ doAnswer(invocation -> {
+ retryStarted.countDown(); // Signal that retry has started
+ bothAttemptsStarted.countDown(); // Signal that this attempt has started
+ retryCanProceed.await(); // Wait for permission to proceed
+ return invocation.callRealMethod();
+ }).when(initialWriter).append(anyString(), anyLong(), any(Mutation.class));
+
+ // Configure the rotation task to pause at specific points and track
attempts
+ doAnswer(invocation -> {
+ rotationTaskStarted.countDown(); // Signal that rotation task has started
+ bothAttemptsStarted.countDown(); // Signal that this attempt has started
+ rotationTaskCanProceed.await(); // Wait for permission to proceed
+ rotationCount.incrementAndGet(); // Track this rotation attempt
+ return invocation.callRealMethod();
+ }).when(logWriter).rotateLog(ReplicationLog.RotationReason.TIME);
+
+ // Start a thread that will trigger rotation via the background task
+ Thread rotationThread = new Thread(() -> {
+ try {
+ // Force rotation by waiting for rotation time
+ Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ rotationThread.start();
+
+ // Start append operation in main thread
+ logWriter.append(tableName, commitId, put);
+
+ // Wait for both attempts to start - this ensures true concurrency
+ assertTrue("Both rotation attempts should start",
+ bothAttemptsStarted.await(5, TimeUnit.SECONDS));
+
+ // Verify both attempts have started before proceeding
+ assertEquals("Both attempts should have started", 0,
bothAttemptsStarted.getCount());
+
+ // Allow both operations to proceed simultaneously
+ retryCanProceed.countDown();
+ rotationTaskCanProceed.countDown();
+
+ // Wait for both operations to complete
+ rotationThread.join();
+
+ // Verify the final state
+ LogFileWriter finalWriter = logWriter.getWriter();
+ assertNotNull("Final writer should not be null", finalWriter);
+ assertTrue("Writer should have been rotated", finalWriter !=
initialWriter);
+
+ // Verify only one rotation actually occurred
+ assertEquals("Should have only one actual rotation", 1,
rotationCount.get());
+
+ // Verify all operations completed successfully
+ logWriter.sync();
+
+ // Verify the sequence of operations through the latches
+ assertTrue("Retry should have started", retryStarted.getCount() == 0);
+ assertTrue("Rotation task should have started",
rotationTaskStarted.getCount() == 0);
+ }
+
+ /**
+ * Tests that multiple sync requests are consolidated into a single sync
operation on the inner
+ * writer when they occur in quick succession. Verifies that the Disruptor
batching and
+ * LogEventHandler processing correctly consolidates multiple sync requests
into a single sync
+ * operation, while still completing all sync futures successfully.
+ */
+ @Test
+ public void testSyncConsolidation() throws Exception {
+ final String tableName = "TBLSC";
+ final Mutation put1 = LogFileTestUtil.newPut("row1", 1, 1);
+ final long commitId1 = 1L;
+ final Mutation put2 = LogFileTestUtil.newPut("row2", 2, 1);
+ final long commitId2 = 2L;
+ final Mutation put3 = LogFileTestUtil.newPut("row3", 3, 1);
+ final long commitId3 = 3L;
+
+ LogFileWriter innerWriter = logWriter.getWriter();
+ assertNotNull("Inner writer should not be null", innerWriter);
+
+ // Configure writer to briefly hold up the LogEventHandler upon first
append.
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(50); // Delay to allow multiple events to be posted
+ return invocation.callRealMethod();
+ }
+ }).when(innerWriter).append(eq(tableName), eq(commitId1), eq(put1));
+
+ // Post appends and three syncs in quick succession. The first append will
be delayed long
+ // enough for the three syncs to appear in a single Disruptor batch. Then
they should all
+ // be consolidated into a single sync.
+ logWriter.append(tableName, commitId1, put1);
+ logWriter.sync();
+ logWriter.append(tableName, commitId2, put2);
+ logWriter.sync();
+ logWriter.append(tableName, commitId3, put3);
+ logWriter.sync();
+
+ // Verify the sequence of operations on the inner writer: the three
appends, then exactly
+ // one sync.
+ InOrder inOrder = Mockito.inOrder(innerWriter);
+ inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId1),
eq(put1));
+ inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId2),
eq(put2));
+ inOrder.verify(innerWriter, times(1)).append(eq(tableName), eq(commitId3),
eq(put3));
+ inOrder.verify(innerWriter, times(1)).sync(); // Only one sync should be
called
+ }
+
+ static class TestableReplicationLog extends ReplicationLog {
+
+ protected TestableReplicationLog(Configuration conf, ServerName
serverName) {
+ super(conf, serverName);
+ }
+
+ @Override
+ protected LogFileWriter createNewWriter(FileSystem fs, URI url) throws
IOException {
+ return spy(super.createNewWriter(fs, url));
+ }
+
+ @Override
+ protected MetricsReplicationLogSource createMetricsSource() {
+ return new MetricsReplicationLogSourceImpl();
+ }
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index f41f7dad75..24fc8e655f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,10 +98,10 @@
<jackson-bom.version>2.18.4.1</jackson-bom.version>
<netty-bom.version>4.1.126.Final</netty-bom.version>
<antlr.version>3.5.2</antlr.version>
+ <disruptor.version>3.3.6</disruptor.version>
<!-- Only used for tests with HBase 2.1-2.4 -->
<reload4j.version>1.2.19</reload4j.version>
<log4j2.version>2.18.0</log4j2.version>
- <disruptor.version>3.3.6</disruptor.version>
<slf4j.version>1.7.36</slf4j.version>
<!-- com.google repo will be used except on Aarch64 platform. -->
<protobuf.group>com.google.protobuf</protobuf.group>