This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 389ff74a74 PHOENIX-7640 Refactor ReplicationLog for HA Groups (#2197)
389ff74a74 is described below
commit 389ff74a74979d3e45f43362f076b4727eae6776
Author: Andrew Purtell <[email protected]>
AuthorDate: Thu Dec 18 11:12:24 2025 -0800
PHOENIX-7640 Refactor ReplicationLog for HA Groups (#2197)
Conflicts:
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
---
.../phoenix/replication/ReplicationLogGroup.java | 408 ++++++++++++++
...tionLog.java => ReplicationLogGroupWriter.java} | 538 ++++--------------
.../phoenix/replication/StandbyLogGroupWriter.java | 136 +++++
.../replication/StoreAndForwardLogGroupWriter.java | 74 +++
....java => MetricsReplicationLogGroupSource.java} | 14 +-
...a => MetricsReplicationLogGroupSourceImpl.java} | 27 +-
...onLogTest.java => ReplicationLogGroupTest.java} | 603 +++++++++------------
7 files changed, 1017 insertions(+), 783 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
new file mode 100644
index 0000000000..80f15bbbf0
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.phoenix.replication.metrics.MetricsReplicationLogGroupSource;
+import
org.apache.phoenix.replication.metrics.MetricsReplicationLogGroupSourceImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ReplicationLogGroup manages a group of replication logs for a given HA
Group.
+ * <p>
+ * This class provides an API for replication operations and delegates to
either synchronous
+ * replication (StandbyLogGroupWriter) or store-and-forward replication
+ * (StoreAndForwardLogGroupWriter) based on the current replication mode.
+ * <p>
+ * Key features:
+ * <ul>
+ * <li>Manages multiple replication logs for an HA Group</li>
+ * <li>Provides append() and sync() API for higher layers</li>
+ * <li>Delegates to appropriate writer implementation based on replication
mode</li>
+ * <li>Thread-safe operations</li>
+ * </ul>
+ * <p>
+ * The class delegates actual replication work to implementations of
ReplicationLogGroupWriter:
+ * <ul>
+ * <li>StandbyLogGroupWriter: Synchronous replication to standby cluster</li>
+ * <li>StoreAndForwardLogGroupWriter: Local storage with forwarding when
available</li>
+ * </ul>
+ */
+public class ReplicationLogGroup {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicationLogGroup.class);
+
+ // Configuration constants from original ReplicationLog
+ public static final String REPLICATION_STANDBY_HDFS_URL_KEY =
+ "phoenix.replication.log.standby.hdfs.url";
+ public static final String REPLICATION_FALLBACK_HDFS_URL_KEY =
+ "phoenix.replication.log.fallback.hdfs.url";
+ public static final String REPLICATION_NUM_SHARDS_KEY =
"phoenix.replication.log.shards";
+ public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000;
+ public static final int MAX_REPLICATION_NUM_SHARDS = 100000;
+ public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
+ "phoenix.replication.log.rotation.time.ms";
+ public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 *
1000L;
+ public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY =
+ "phoenix.replication.log.rotation.size.bytes";
+ public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 *
1024 * 1024L;
+ public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY =
+ "phoenix.replication.log.rotation.size.percentage";
+ public static final double DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE
= 0.95;
+ public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY =
+ "phoenix.replication.log.compression";
+ public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM =
"NONE";
+ public static final String REPLICATION_LOG_RINGBUFFER_SIZE_KEY =
+ "phoenix.replication.log.ringbuffer.size";
+ public static final int DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE = 1024 * 32;
+ public static final String REPLICATION_LOG_SYNC_TIMEOUT_KEY =
+ "phoenix.replication.log.sync.timeout.ms";
+ public static final long DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT = 1000 * 30;
+ public static final String REPLICATION_LOG_SYNC_RETRIES_KEY =
+ "phoenix.replication.log.sync.retries";
+ public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 5;
+ public static final String REPLICATION_LOG_ROTATION_RETRIES_KEY =
+ "phoenix.replication.log.rotation.retries";
+ public static final int DEFAULT_REPLICATION_LOG_ROTATION_RETRIES = 5;
+ public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
+ "phoenix.replication.log.retry.delay.ms";
+ public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
+
+ public static final String SHARD_DIR_FORMAT = "%05d";
+ public static final String FILE_NAME_FORMAT = "%d-%s.plog";
+
+ /** Cache of ReplicationLogGroup instances by HA Group ID */
+ protected static final ConcurrentHashMap<String, ReplicationLogGroup>
INSTANCES =
+ new ConcurrentHashMap<>();
+
+ protected final Configuration conf;
+ protected final ServerName serverName;
+ protected final String haGroupName;
+ protected ReplicationLogGroupWriter remoteWriter;
+ protected ReplicationLogGroupWriter localWriter;
+ protected ReplicationMode mode;
+ protected volatile boolean closed = false;
+ protected final MetricsReplicationLogGroupSource metrics;
+
+ /**
+ * Tracks the current replication mode of the ReplicationLog.
+ * <p>
+ * The replication mode determines how mutations are handled:
+ * <ul>
+ * <li>SYNC: Normal operation where mutations are written directly to the
standby cluster's HDFS.
+ * This is the default and primary mode of operation.</li>
+ * <li>STORE_AND_FORWARD: Fallback mode when the standby cluster's HDFS is
unavailable. Mutations
+ * are stored locally and will be forwarded when connectivity is
restored.</li>
+ * <li>SYNC_AND_FORWARD: Transitional mode where new mutations are written
directly to the standby
+ * cluster while concurrently draining the local queue of previously stored
mutations.</li>
+ * </ul>
+ * <p>
+ * Mode transitions occur automatically based on the availability of the
standby cluster's HDFS
+ * and the state of the local mutation queue.
+ */
+ protected enum ReplicationMode {
+ /**
+ * Normal operation where mutations are written directly to the standby
cluster's HDFS. This is
+ * the default and primary mode of operation.
+ */
+ SYNC,
+
+ /**
+ * Fallback mode when the standby cluster's HDFS is unavailable. Mutations
are stored locally
+ * and will be forwarded when connectivity is restored.
+ */
+ STORE_AND_FORWARD,
+
+ /**
+ * Transitional mode where new mutations are written directly to the
standby cluster while
+ * concurrently draining the local queue of previously stored mutations.
This mode is entered
+ * when connectivity to the standby cluster is restored and there are
still mutations in the
+ * local queue.
+ */
+ SYNC_AND_FORWARD;
+ }
+
+ /**
+ * Get or create a ReplicationLogGroup instance for the given HA Group.
+ * @param conf Configuration object
+ * @param serverName The server name
+ * @param haGroupName The HA Group name
+ * @return ReplicationLogGroup instance
+ * @throws RuntimeException if initialization fails
+ */
+ public static ReplicationLogGroup get(Configuration conf, ServerName
serverName,
+ String haGroupName) {
+ return INSTANCES.computeIfAbsent(haGroupName, k -> {
+ try {
+ ReplicationLogGroup group = new ReplicationLogGroup(conf, serverName,
haGroupName);
+ group.init();
+ return group;
+ } catch (IOException e) {
+ LOG.error("Failed to create ReplicationLogGroup for HA Group: {}",
haGroupName, e);
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /**
+ * Protected constructor for ReplicationLogGroup.
+ * @param conf Configuration object
+ * @param serverName The server name
+ * @param haGroupName The HA Group name
+ */
+ protected ReplicationLogGroup(Configuration conf, ServerName serverName,
String haGroupName) {
+ this.conf = conf;
+ this.serverName = serverName;
+ this.haGroupName = haGroupName;
+ this.metrics = createMetricsSource();
+ }
+
+ /**
+ * Initialize the ReplicationLogGroup by creating the appropriate writer
implementation.
+ * @throws IOException if initialization fails
+ */
+ protected void init() throws IOException {
+ // We need the local writer created first if we intend to fall back to it
should the init
+ // of the remote writer fail.
+ localWriter = createLocalWriter();
+ // Initialize the remote writer and set the mode to SYNC. TODO: switch
instead of set
+ mode = ReplicationMode.SYNC;
+ remoteWriter = createRemoteWriter();
+ // TODO: Switch the initial mode to STORE_AND_FORWARD if the remote writer
fails to
+ // initialize.
+ LOG.info("Started ReplicationLogGroup for HA Group: {}", haGroupName);
+ }
+
+ /**
+ * Get the name for this HA Group.
+ * @return The name for this HA Group
+ */
+ public String getHaGroupName() {
+ return haGroupName;
+ }
+
+ protected Configuration getConfiguration() {
+ return conf;
+ }
+
+ protected ServerName getServerName() {
+ return serverName;
+ }
+
+ /**
+ * Append a mutation to the replication log group. This operation is
normally non-blocking unless
+ * the ring buffer is full.
+ * @param tableName The name of the HBase table the mutation applies to
+ * @param commitId The commit identifier (e.g., SCN) associated with the
mutation
+ * @param mutation The HBase Mutation (Put or Delete) to be logged
+ * @throws IOException If the operation fails
+ */
+ public void append(String tableName, long commitId, Mutation mutation)
throws IOException {
+ if (closed) {
+ throw new IOException("Closed");
+ }
+ long startTime = System.nanoTime();
+ try {
+ switch (mode) {
+ case SYNC:
+ // In sync mode, we only write to the remote writer.
+ try {
+ remoteWriter.append(tableName, commitId, mutation);
+ } catch (IOException e) {
+ // TODO: If the remote writer fails, we must switch to store and
forward.
+ LOG.warn("Mode switching not implemented");
+ throw e;
+ }
+ break;
+ case SYNC_AND_FORWARD:
+ // In sync and forward mode, we write to only the remote writer,
while in the
+ // background we are draining the local queue.
+ try {
+ remoteWriter.append(tableName, commitId, mutation);
+ } catch (IOException e) {
+ // TODO: If the remote writer fails again, we must switch back to
store and
+ // forward.
+ LOG.warn("Mode switching not implemented");
+ throw e;
+ }
+ break;
+ case STORE_AND_FORWARD:
+ // In store and forward mode, we append to the local writer. If we
fail it's a
+ // critical failure.
+ localWriter.append(tableName, commitId, mutation);
+ // TODO: Probe the state of the remoteWriter. Can we switch back?
+ // TODO: This suggests the ReplicationLogGroupWriter interface
should have a status
+ // probe API.
+ break;
+ default:
+ throw new IllegalStateException("Invalid replication mode: " + mode);
+ }
+ } finally {
+ metrics.updateAppendTime(System.nanoTime() - startTime);
+ }
+ }
+
+ /**
+ * Ensure all previously appended records are durably persisted. This method
blocks until the sync
+ * operation completes or fails.
+ * @throws IOException If the sync operation fails
+ */
+ public void sync() throws IOException {
+ if (closed) {
+ throw new IOException("Closed");
+ }
+ long startTime = System.nanoTime();
+ try {
+ switch (mode) {
+ case SYNC:
+ // In sync mode, we only write to the remote writer.
+ try {
+ remoteWriter.sync();
+ } catch (IOException e) {
+ // TODO: If the remote writer fails, we must switch to store and
forward.
+ LOG.warn("Mode switching not implemented");
+ throw e;
+ }
+ break;
+ case SYNC_AND_FORWARD:
+ // In sync and forward mode, we write to only the remote writer,
while in the
+ // background we are draining the local queue.
+ try {
+ remoteWriter.sync();
+ } catch (IOException e) {
+ // TODO: If the remote writer fails again, we must switch back to
store and
+ // forward.
+ LOG.warn("Mode switching not implemented");
+ throw e;
+ }
+ break;
+ case STORE_AND_FORWARD:
+ // In store and forward mode, we sync the local writer. If we fail
it's a critical
+ // failure.
+ localWriter.sync();
+ // TODO: Probe the state of the remoteWriter. Can we switch back?
+ // TODO: This suggests the ReplicationLogGroupWriter interface
should have a
+ // status probe API.
+ break;
+ default:
+ throw new IllegalStateException("Invalid replication mode: " + mode);
+ }
+ } finally {
+ metrics.updateSyncTime(System.nanoTime() - startTime);
+ }
+ }
+
+ /**
+ * Check if this ReplicationLogGroup is closed.
+ * @return true if closed, false otherwise
+ */
+ public boolean isClosed() {
+ return closed;
+ }
+
+ /**
+ * Close the ReplicationLogGroup and all associated resources. This method
is thread-safe and can
+ * be called multiple times.
+ */
+ public void close() {
+ if (closed) {
+ return;
+ }
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ // Remove from instances cache
+ INSTANCES.remove(haGroupName);
+ // Close the writers, remote first. If there are any problems closing
the remote writer
+ // the pending writes will be sent to the local writer instead, during
the appropriate
+ // mode switch.
+ closeWriter(remoteWriter);
+ closeWriter(localWriter);
+ metrics.close();
+ LOG.info("Closed ReplicationLogGroup for HA Group: {}", haGroupName);
+ }
+ }
+
+ /**
+ * Switch the replication mode.
+ * @param mode The new replication mode
+ * @param reason The reason for the mode switch
+ * @throws IOException If the mode switch fails
+ */
+ public void switchMode(ReplicationMode mode, Throwable reason) throws
IOException {
+ // TODO: Implement mode switching guardrails and transition logic.
+ // TODO: We will be interacting with the HA Group Store to switch modes.
+
+ // TODO: Drain the disruptor ring from the remote writer to the local
writer when making
+ // transitions from SYNC or SYNC_AND_FORWARD to STORE_AND_FORWARD.
+
+ throw new UnsupportedOperationException("Mode switching is not
implemented");
+ }
+
+ /** Get the current metrics source for monitoring operations. */
+ public MetricsReplicationLogGroupSource getMetrics() {
+ return metrics;
+ }
+
+ /** Create a new metrics source for monitoring operations. */
+ protected MetricsReplicationLogGroupSource createMetricsSource() {
+ return new MetricsReplicationLogGroupSourceImpl(haGroupName);
+ }
+
+ /** Close the given writer. */
+ protected void closeWriter(ReplicationLogGroupWriter writer) {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+
+ /** Create the remote (synchronous) writer. Mainly for tests. */
+ protected ReplicationLogGroupWriter createRemoteWriter() throws IOException {
+ ReplicationLogGroupWriter writer = new StandbyLogGroupWriter(this);
+ writer.init();
+ return writer;
+ }
+
+ /** Create the local (store and forward) writer. Mainly for tests. */
+ protected ReplicationLogGroupWriter createLocalWriter() throws IOException {
+ ReplicationLogGroupWriter writer = new StoreAndForwardLogGroupWriter(this);
+ writer.init();
+ return writer;
+ }
+
+ /** Returns the currently active writer. Mainly for tests. */
+ protected ReplicationLogGroupWriter getActiveWriter() {
+ switch (mode) {
+ case SYNC:
+ return remoteWriter;
+ case SYNC_AND_FORWARD:
+ return remoteWriter;
+ case STORE_AND_FORWARD:
+ return localWriter;
+ default:
+ throw new IllegalStateException("Invalid replication mode: " + mode);
+ }
+ }
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
similarity index 60%
rename from
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
rename to
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
index 3a2d9567a0..b76b8fab34 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
@@ -27,11 +27,9 @@ import com.lmax.disruptor.dsl.ProducerType;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -41,59 +39,27 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.phoenix.replication.log.LogFileWriter;
-import org.apache.phoenix.replication.log.LogFileWriterContext;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogSource;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogSourceImpl;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
- * The ReplicationLog implements a high-performance logging system for
mutations that need to be
- * replicated to another cluster.
+ * Base class for replication log group writers.
* <p>
- * Key features:
- * <ul>
- * <li>Asynchronous append operations with batching for high throughput</li>
- * <li>Controlled blocking sync operations with timeout and retry logic</li>
- * <li>Automatic log rotation based on time and size thresholds</li>
- * <li>Sharded directory structure for better HDFS performance</li>
- * <li>Metrics for monitoring</li>
- * <li>Fail-stop behavior on critical errors</li>
- * </ul>
- * <p>
- * The class supports three replication modes (though currently only SYNC is
implemented):
- * <ul>
- * <li>SYNC: Direct writes to standby cluster (default)</li>
- * <li>STORE_AND_FORWARD: Local storage when standby is unavailable</li>
- * <li>SYNC_AND_FORWARD: Concurrent direct writes and queue draining</li>
- * </ul>
- * <p>
- * Key configuration properties:
- * <ul>
- * <li>{@link #REPLICATION_STANDBY_HDFS_URL_KEY}: URL for standby cluster
HDFS</li>
- * <li>{@link #REPLICATION_FALLBACK_HDFS_URL_KEY}: URL for local fallback
storage</li>
- * <li>{@link #REPLICATION_NUM_SHARDS_KEY}: Number of shard directories</li>
- * <li>{@link #REPLICATION_LOG_ROTATION_TIME_MS_KEY}: Time-based rotation
interval</li>
- * <li>{@link #REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY}: Size-based rotation
threshold</li>
- * <li>{@link #REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY}: Compression
algorithm</li>
- * </ul>
- * <p>
- * This class is intended to be thread-safe.
+ * This abstract class contains most of the common functionality for managing
replication logs
+ * including the disruptor ring buffer, log rotation, file system management,
and metrics. Concrete
+ * implementations provide specific replication behavior (synchronous vs
store-and- forward).
* <p>
* Architecture Overview:
*
* <pre>
* ┌──────────────────────────────────────────────────────────────────────┐
- * │ ReplicationLog │
+ * │ ReplicationLogGroup │
* │ │
* │ ┌─────────────┐ ┌────────────────────────────────────────────┐ │
* │ │ │ │ │ │
@@ -138,129 +104,26 @@ import
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFac
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value = { "EI_EXPOSE_REP", "EI_EXPOSE_REP2", "MS_EXPOSE_REP" },
justification = "Intentional")
-public class ReplicationLog {
-
- /** The path on the standby HDFS where log files should be written. (The
"IN" directory.) */
- public static final String REPLICATION_STANDBY_HDFS_URL_KEY =
- "phoenix.replication.log.standby.hdfs.url";
- /**
- * The path on the active HDFS where log files should be written when we
have fallen back to store
- * and forward mode. (The "OUT" directory.)
- */
- public static final String REPLICATION_FALLBACK_HDFS_URL_KEY =
- "phoenix.replication.log.fallback.hdfs.url";
- /**
- * The number of shards (subfolders) to maintain in the "IN" directory.
- * <p>
- * Shard directories have the format shard-NNNNN, e.g. shard-00001. The
maximum value is 100000.
- */
- public static final String REPLICATION_NUM_SHARDS_KEY =
"phoenix.replication.log.shards";
- public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000;
- public static final int MAX_REPLICATION_NUM_SHARDS = 100000;
- /** Replication log rotation time trigger, default is 1 minute */
- public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY =
- "phoenix.replication.log.rotation.time.ms";
- public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 *
1000L;
- /** Replication log rotation size trigger, default is 256 MB */
- public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY =
- "phoenix.replication.log.rotation.size.bytes";
- public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 *
1024 * 1024L;
- /** Replication log rotation size trigger percentage, default is 0.95 */
- public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY =
- "phoenix.replication.log.rotation.size.percentage";
- public static final double DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE
= 0.95;
- /** Replication log compression, default is "NONE" */
- public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY =
- "phoenix.replication.log.compression";
- public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM =
"NONE";
- public static final String REPLICATION_LOG_RINGBUFFER_SIZE_KEY =
- "phoenix.replication.log.ringbuffer.size";
- public static final int DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE = 1024 * 32;
// Too big?
- public static final String REPLICATION_LOG_SYNC_TIMEOUT_KEY =
- "phoenix.replication.log.sync.timeout.ms";
- public static final long DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT = 1000 * 30;
- public static final String REPLICATION_LOG_SYNC_RETRIES_KEY =
- "phoenix.replication.log.sync.retries";
- public static final int DEFAULT_REPLICATION_LOG_SYNC_RETRIES = 5;
- public static final String REPLICATION_LOG_ROTATION_RETRIES_KEY =
- "phoenix.replication.log.rotation.retries";
- public static final int DEFAULT_REPLICATION_LOG_ROTATION_RETRIES = 5;
- public static final String REPLICATION_LOG_RETRY_DELAY_MS_KEY =
- "phoenix.replication.log.retry.delay.ms";
- public static final long DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS = 100L;
-
- public static final String SHARD_DIR_FORMAT = "shard%05d";
- public static final String FILE_NAME_FORMAT = "%d-%s.plog";
-
- static final byte EVENT_TYPE_DATA = 0;
- static final byte EVENT_TYPE_SYNC = 1;
-
- static final Logger LOG = LoggerFactory.getLogger(ReplicationLog.class);
+public abstract class ReplicationLogGroupWriter {
- protected static volatile ReplicationLog instance;
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicationLogGroupWriter.class);
- protected final Configuration conf;
- protected final ServerName serverName;
- protected FileSystem standbyFs;
- protected FileSystem fallbackFs; // For store-and-forward (future use)
- protected int numShards;
- protected URI standbyUrl;
- protected URI fallbackUrl; // For store-and-forward (future use)
+ protected final ReplicationLogGroup logGroup;
protected final long rotationTimeMs;
protected final long rotationSizeBytes;
protected final int maxRotationRetries;
protected final Compression.Algorithm compression;
+ protected final int ringBufferSize;
+ protected final long syncTimeoutMs;
protected final ReentrantLock lock = new ReentrantLock();
- protected volatile LogFileWriter currentWriter; // Current writer
+ protected volatile LogFileWriter currentWriter;
protected final AtomicLong lastRotationTime = new AtomicLong();
protected final AtomicLong writerGeneration = new AtomicLong();
protected final AtomicLong rotationFailures = new AtomicLong(0);
protected ScheduledExecutorService rotationExecutor;
- protected final int ringBufferSize;
- protected final long syncTimeoutMs;
protected Disruptor<LogEvent> disruptor;
protected RingBuffer<LogEvent> ringBuffer;
- protected final ConcurrentHashMap<Path, Object> shardMap = new
ConcurrentHashMap<>();
- protected final MetricsReplicationLogSource metrics;
- protected volatile boolean isClosed = false;
-
- /**
- * Tracks the current replication mode of the ReplicationLog.
- * <p>
- * The replication mode determines how mutations are handled:
- * <ul>
- * <li>SYNC: Normal operation where mutations are written directly to the
standby cluster's HDFS.
- * This is the default and primary mode of operation.</li>
- * <li>STORE_AND_FORWARD: Fallback mode when the standby cluster's HDFS is
unavailable. Mutations
- * are stored locally and will be forwarded when connectivity is
restored.</li>
- * <li>SYNC_AND_FORWARD: Transitional mode where new mutations are written
directly to the standby
- * cluster while concurrently draining the local queue of previously stored
mutations.</li>
- * </ul>
- * <p>
- * Mode transitions occur automatically based on the availability of the
standby cluster's HDFS
- * and the state of the local mutation queue.
- */
- protected enum ReplicationMode {
- /**
- * Normal operation where mutations are written directly to the standby
cluster's HDFS. This is
- * the default and primary mode of operation.
- */
- SYNC,
-
- /**
- * Fallback mode when the standby cluster's HDFS is unavailable. Mutations
are stored locally
- * and will be forwarded when connectivity is restored.
- */
- STORE_AND_FORWARD,
-
- /**
- * Transitional mode where new mutations are written directly to the
standby cluster while
- * concurrently draining the local queue of previously stored mutations.
This mode is entered
- * when connectivity to the standby cluster is restored while there are
still mutations in the
- * local queue.
- */
- SYNC_AND_FORWARD;
- }
+ protected volatile boolean closed = false;
/** The reason for requesting a log rotation. */
protected enum RotationReason {
@@ -272,89 +135,28 @@ public class ReplicationLog {
ERROR;
}
- /**
- * The current replication mode. Always SYNC for now.
- * <p>
- * TODO: Implement mode transitions to STORE_AND_FORWARD when standby
becomes unavailable.
- * <p>
- * TODO: Implement mode transitions to SYNC_AND_FORWARD when draining queue.
- */
- protected volatile ReplicationMode currentMode = ReplicationMode.SYNC;
-
- // TODO: Add configuration keys for store-and-forward behavior
- // - Maximum retry attempts before switching to store-and-forward
- // - Retry delay between attempts
- // - Queue drain batch size
- // - Queue drain interval
-
- // TODO: Add state tracking fields
- // - Queue of pending changes when in store-and-forward mode
- // - Timestamp of last successful standby write
- // - Error count for tracking consecutive failures
-
- // TODO: Add methods for state transitions
- // - switchToStoreAndForward() - Called when standby becomes unavailable
- // - switchToSync() - Called when standby becomes available again
- // - drainQueue() - Background task to process queued changes
-
- // TODO: Enhance error handling in LogEventHandler
- // - Track consecutive failures
- // - Switch to store-and-forward after max retries
-
- // TODO: Implement queue management for store-and-forward mode
- // - Implement queue persistence to handle RegionServer restarts
- // - Implement queue draining when in SYNC_AND_FORWARD state
- // - Implement automatic recovery from temporary network issues
- // - Add configurable thresholds for switching to store-and-forward based on
write latency
- // - Add circuit breaker pattern to prevent overwhelming the standby cluster
- // - Add queue size limits and backpressure mechanisms
- // - Add queue metrics for monitoring (queue size, oldest entry age, etc.)
-
- // TODO: Enhance metrics for replication health monitoring
- // - Add metrics for replication lag between active and standby
- // - Track time spent in each replication mode (SYNC, STORE_AND_FORWARD,
SYNC_AND_FORWARD)
- // - Monitor queue drain rate and backlog size
- // - Track consecutive failures and mode transition events
-
- /**
- * Gets the singleton instance of the ReplicationLogManager using the lazy
initializer pattern.
- * Initializes the instance if it hasn't been created yet.
- * @param conf Configuration object.
- * @param serverName The server name.
- * @return The singleton ReplicationLogManager instance.
- * @throws IOException If initialization fails.
- */
- public static ReplicationLog get(Configuration conf, ServerName serverName)
throws IOException {
- if (instance == null) {
- synchronized (ReplicationLog.class) {
- if (instance == null) {
- // Complete initialization before assignment
- ReplicationLog logManager = new ReplicationLog(conf, serverName);
- logManager.init();
- instance = logManager;
- }
- }
- }
- return instance;
- }
-
- protected ReplicationLog(Configuration conf, ServerName serverName) {
- this.conf = conf;
- this.serverName = serverName;
- this.rotationTimeMs =
- conf.getLong(REPLICATION_LOG_ROTATION_TIME_MS_KEY,
DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS);
- long rotationSize = conf.getLong(REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
- DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
- double rotationSizePercent =
conf.getDouble(REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY,
- DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE);
+ protected static final byte EVENT_TYPE_DATA = 0;
+ protected static final byte EVENT_TYPE_SYNC = 1;
+
+ protected ReplicationLogGroupWriter(ReplicationLogGroup logGroup) {
+ this.logGroup = logGroup;
+ Configuration conf = logGroup.getConfiguration();
+ this.rotationTimeMs =
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_TIME_MS_KEY,
+ ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS);
+ long rotationSize =
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
+ ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES);
+ double rotationSizePercent =
+
conf.getDouble(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY,
+ ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE);
this.rotationSizeBytes = (long) (rotationSize * rotationSizePercent);
- this.maxRotationRetries =
- conf.getInt(REPLICATION_LOG_ROTATION_RETRIES_KEY,
DEFAULT_REPLICATION_LOG_ROTATION_RETRIES);
- this.numShards = conf.getInt(REPLICATION_NUM_SHARDS_KEY,
DEFAULT_REPLICATION_NUM_SHARDS);
- String compressionName =
conf.get(REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY,
- DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM);
+ this.maxRotationRetries =
conf.getInt(ReplicationLogGroup.REPLICATION_LOG_ROTATION_RETRIES_KEY,
+ ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES);
+ String compressionName =
conf.get(ReplicationLogGroup.REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY,
+ ReplicationLogGroup.DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM);
Compression.Algorithm compression = Compression.Algorithm.NONE;
- if
(!DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM.equalsIgnoreCase(compressionName))
{
+ if (
+
!compressionName.equals(ReplicationLogGroup.DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM)
+ ) {
try {
compression =
Compression.getCompressionAlgorithmByName(compressionName);
} catch (IllegalArgumentException e) {
@@ -362,51 +164,21 @@ public class ReplicationLog {
}
}
this.compression = compression;
- this.ringBufferSize =
- conf.getInt(REPLICATION_LOG_RINGBUFFER_SIZE_KEY,
DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE);
- this.syncTimeoutMs =
- conf.getLong(REPLICATION_LOG_SYNC_TIMEOUT_KEY,
DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT);
- this.metrics = createMetricsSource();
- }
-
- /** Creates a new metrics source for monitoring replication log operations.
*/
- protected MetricsReplicationLogSource createMetricsSource() {
- return new MetricsReplicationLogSourceImpl();
- }
-
- /** Returns the metrics source for monitoring replication log operations. */
- public MetricsReplicationLogSource getMetrics() {
- return metrics;
+ this.ringBufferSize =
conf.getInt(ReplicationLogGroup.REPLICATION_LOG_RINGBUFFER_SIZE_KEY,
+ ReplicationLogGroup.DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE);
+ this.syncTimeoutMs =
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_SYNC_TIMEOUT_KEY,
+ ReplicationLogGroup.DEFAULT_REPLICATION_LOG_SYNC_TIMEOUT);
}
- @SuppressWarnings("unchecked")
+ /** Initialize the writer. */
public void init() throws IOException {
- if (numShards > MAX_REPLICATION_NUM_SHARDS) {
- throw new IllegalArgumentException(REPLICATION_NUM_SHARDS_KEY + " is " +
numShards
- + ", but the limit is " + MAX_REPLICATION_NUM_SHARDS);
- }
initializeFileSystems();
// Start time based rotation.
lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
startRotationExecutor();
- // Create the initial writer. Do this before we call
LogEventHandler.init().
- currentWriter = createNewWriter(standbyFs, standbyUrl);
- // Initialize the Disruptor. We use ProducerType.MULTI because multiple
handlers might
- // call append concurrently. We use YieldingWaitStrategy for low latency.
When the ring
- // buffer is full (controlled by REPLICATION_WRITER_RINGBUFFER_SIZE_KEY),
producers
- // calling ringBuffer.next() will effectively block (by
yielding/spinning), creating
- // backpressure on the callers. This ensures appends don't proceed until
there is space.
- disruptor = new Disruptor<>(
- LogEvent.EVENT_FACTORY, ringBufferSize, new ThreadFactoryBuilder()
-
.setNameFormat("ReplicationLogEventHandler-%d").setDaemon(true).build(),
- ProducerType.MULTI, new YieldingWaitStrategy());
- LogEventHandler eventHandler = new LogEventHandler();
- eventHandler.init();
- disruptor.handleEventsWith(eventHandler);
- LogExceptionHandler exceptionHandler = new LogExceptionHandler();
- disruptor.setDefaultExceptionHandler(exceptionHandler);
- ringBuffer = disruptor.start();
- LOG.info("ReplicationLogWriter started with ring buffer size {}",
ringBufferSize);
+ // Create the initial writer. Do this before we initialize the Disruptor.
+ currentWriter = createNewWriter();
+ initializeDisruptor();
}
/**
@@ -428,10 +200,9 @@ public class ReplicationLog {
if (LOG.isTraceEnabled()) {
LOG.trace("Append: table={}, commitId={}, mutation={}", tableName,
commitId, mutation);
}
- if (isClosed) {
+ if (closed) {
throw new IOException("Closed");
}
- long startTime = System.nanoTime();
// ringBuffer.next() claims the next sequence number. Because we
initialize the Disruptor
// with ProducerType.MULTI and the blocking YieldingWaitStrategy this call
WILL BLOCK if
// the ring buffer is full, thus providing backpressure to the callers.
@@ -439,7 +210,6 @@ public class ReplicationLog {
try {
LogEvent event = ringBuffer.get(sequence);
event.setValues(EVENT_TYPE_DATA, new Record(tableName, commitId,
mutation), null);
- metrics.updateAppendTime(System.nanoTime() - startTime);
} finally {
// Update ring buffer events metric
ringBuffer.publish(sequence);
@@ -462,18 +232,44 @@ public class ReplicationLog {
* @throws IOException If the sync operation fails after retries, or if
interrupted.
*/
public void sync() throws IOException {
- if (isClosed) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Sync");
+ }
+ if (closed) {
throw new IOException("Closed");
}
syncInternal();
}
+ /** Initialize file systems needed by this writer implementation. */
+ protected abstract void initializeFileSystems() throws IOException;
+
+ /**
+ * Create a new log writer for rotation.
+ */
+ protected abstract LogFileWriter createNewWriter() throws IOException;
+
+ /** Initialize the Disruptor. */
+ @SuppressWarnings("unchecked")
+ protected void initializeDisruptor() throws IOException {
+ disruptor = new Disruptor<>(LogEvent.EVENT_FACTORY, ringBufferSize,
+ new ThreadFactoryBuilder()
+ .setNameFormat("ReplicationLogGroupWriter-" +
logGroup.getHaGroupName() + "-%d")
+ .setDaemon(true).build(),
+ ProducerType.MULTI, new YieldingWaitStrategy());
+ LogEventHandler eventHandler = new LogEventHandler();
+ eventHandler.init();
+ disruptor.handleEventsWith(eventHandler);
+ LogExceptionHandler exceptionHandler = new LogExceptionHandler();
+ disruptor.setDefaultExceptionHandler(exceptionHandler);
+ ringBuffer = disruptor.start();
+ }
+
/**
* Internal implementation of sync that publishes a sync event to the ring
buffer and waits for
* completion.
*/
protected void syncInternal() throws IOException {
- long startTime = System.nanoTime();
CompletableFuture<Void> syncFuture = new CompletableFuture<>();
long sequence = ringBuffer.next();
try {
@@ -486,10 +282,7 @@ public class ReplicationLog {
try {
// Wait for the event handler to process up to and including this sync
event
syncFuture.get(syncTimeoutMs, TimeUnit.MILLISECONDS);
- metrics.updateSyncTime(System.nanoTime() - startTime);
} catch (InterruptedException e) {
- // Almost certainly the regionserver is shutting down or aborting.
- // TODO: Do we need to do more here?
Thread.currentThread().interrupt();
throw new InterruptedIOException("Interrupted while waiting for sync");
} catch (ExecutionException e) {
@@ -506,90 +299,32 @@ public class ReplicationLog {
}
}
- /** Initializes the standby and fallback filesystems and creates their log
directories. */
- protected void initializeFileSystems() throws IOException {
- String standbyUrlString = conf.get(REPLICATION_STANDBY_HDFS_URL_KEY);
- if (standbyUrlString == null) {
- throw new IOException(REPLICATION_STANDBY_HDFS_URL_KEY + " is not
configured");
- }
- // Only validate that the URI is well formed. We should not assume the
scheme must be
- // "hdfs" because perhaps the operator will substitute another FileSystem
implementation
- // for DistributedFileSystem.
- try {
- this.standbyUrl = new URI(standbyUrlString);
- } catch (URISyntaxException e) {
- throw new IOException(REPLICATION_STANDBY_HDFS_URL_KEY + " is not
valid", e);
- }
- String fallbackUrlString = conf.get(REPLICATION_FALLBACK_HDFS_URL_KEY);
- if (fallbackUrlString != null) {
- // Only validate that the URI is well formed, as above.
- try {
- this.fallbackUrl = new URI(fallbackUrlString);
- } catch (URISyntaxException e) {
- throw new IOException(REPLICATION_FALLBACK_HDFS_URL_KEY + " is not
valid", e);
- }
- this.fallbackFs = getFileSystem(fallbackUrl);
- Path fallbackLogDir = new Path(fallbackUrl.getPath());
- if (!fallbackFs.exists(fallbackLogDir)) {
- LOG.info("Creating directory {}", fallbackUrlString);
- if (!this.fallbackFs.mkdirs(fallbackLogDir)) {
- throw new IOException("Failed to create directory: " +
fallbackUrlString);
- }
- }
- } else {
- // We support a synchronous replication only option if store-and-forward
configuration
- // keys are missing. This is outside the scope of the design spec but
potentially
- // useful for testing and also allows an operator to prefer failover
consistency and
- // simplicity over availability, even if that is not recommended. Log it
at WARN level
- // to focus appropriate attention. (Should it be ERROR?)
- LOG.warn("Fallback not configured ({}), store-and-forward DISABLED.",
- REPLICATION_FALLBACK_HDFS_URL_KEY);
- this.fallbackFs = null;
- }
- // Configuration is sorted, and possibly store-and-forward directories
have been created,
- // now create the standby side directories as needed.
- this.standbyFs = getFileSystem(standbyUrl);
- Path standbyLogDir = new Path(standbyUrl.getPath());
- if (!standbyFs.exists(standbyLogDir)) {
- LOG.info("Creating directory {}", standbyUrlString);
- if (!standbyFs.mkdirs(standbyLogDir)) {
- throw new IOException("Failed to create directory: " +
standbyUrlString);
- }
- }
- }
-
- /** Gets a FileSystem instance for the given URI using the current
configuration. */
- protected FileSystem getFileSystem(URI uri) throws IOException {
- return FileSystem.get(uri, conf);
+ protected void startRotationExecutor() {
+ long rotationCheckInterval = getRotationCheckInterval(rotationTimeMs);
+ rotationExecutor = Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryBuilder()
+ .setNameFormat("ReplicationLogRotation-" + logGroup.getHaGroupName() +
"-%d").setDaemon(true)
+ .build());
+ rotationExecutor.scheduleAtFixedRate(new LogRotationTask(),
rotationCheckInterval,
+ rotationCheckInterval, TimeUnit.MILLISECONDS);
+ LOG.debug("Started rotation executor with interval {}ms",
rotationCheckInterval);
}
- /** Calculates the interval for checking log rotation based on the
configured rotation time. */
protected long getRotationCheckInterval(long rotationTimeMs) {
- long interval;
- if (rotationTimeMs > 0) {
- interval = rotationTimeMs / 4;
- } else {
- // If rotation time is not configured or invalid, use a sensible default
like 10 seconds
- interval = 10000L;
- }
+ long interval = Math.max(10 * 1000L, Math.min(60 * 1000L, rotationTimeMs /
10));
return interval;
}
- /** Starts the background task for time-based log rotation. */
- protected void startRotationExecutor() {
- Preconditions.checkState(rotationExecutor == null, "Rotation executor
already started");
- rotationExecutor = Executors.newSingleThreadScheduledExecutor(
- new
ThreadFactoryBuilder().setNameFormat("ReplicationLogRotator-%d").setDaemon(true).build());
- long interval = getRotationCheckInterval(rotationTimeMs);
- rotationExecutor.scheduleWithFixedDelay(new LogRotationTask(), interval,
interval,
- TimeUnit.MILLISECONDS);
- }
-
- /** Stops the background task for time-based log rotation. */
protected void stopRotationExecutor() {
if (rotationExecutor != null) {
- rotationExecutor.shutdownNow();
- rotationExecutor = null;
+ rotationExecutor.shutdown();
+ try {
+ if (!rotationExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ rotationExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ rotationExecutor.shutdownNow();
+ }
}
}
@@ -664,7 +399,7 @@ public class ReplicationLog {
try {
// Try to get the new writer first. If it fails we continue using the
current writer.
// Increment the writer generation
- LogFileWriter newWriter = createNewWriter(standbyFs, standbyUrl);
+ LogFileWriter newWriter = createNewWriter();
LOG.debug("Created new writer: {}", newWriter);
// Close the current writer
if (currentWriter != null) {
@@ -674,23 +409,23 @@ public class ReplicationLog {
currentWriter = newWriter;
lastRotationTime.set(EnvironmentEdgeManager.currentTimeMillis());
rotationFailures.set(0);
- metrics.incrementRotationCount();
+ logGroup.getMetrics().incrementRotationCount();
switch (reason) {
case TIME:
- metrics.incrementTimeBasedRotationCount();
+ logGroup.getMetrics().incrementTimeBasedRotationCount();
break;
case SIZE:
- metrics.incrementSizeBasedRotationCount();
+ logGroup.getMetrics().incrementSizeBasedRotationCount();
break;
case ERROR:
- metrics.incrementErrorBasedRotationCount();
+ logGroup.getMetrics().incrementErrorBasedRotationCount();
break;
}
} catch (IOException e) {
// If we fail to rotate the log, we increment the failure counter. If we
have exceeded
// the maximum number of retries, we close the log and throw the
exception. Otherwise
// we log a warning and continue.
- metrics.incrementRotationFailureCount();
+ logGroup.getMetrics().incrementRotationFailureCount();
long numFailures = rotationFailures.getAndIncrement();
if (numFailures >= maxRotationRetries) {
LOG.warn("Failed to rotate log (attempt {}/{}), closing log",
numFailures,
@@ -706,57 +441,6 @@ public class ReplicationLog {
return currentWriter;
}
- /**
- * Creates a new log file path in a sharded directory structure based on
server name and
- * timestamp.
- */
- protected Path makeWriterPath(FileSystem fs, URI url) throws IOException {
- long timestamp = EnvironmentEdgeManager.currentTimeMillis();
- // To have all logs for a given regionserver appear in the same shard,
hash only the
- // serverName. However we expect some regionservers will have
significantly more load than
- // others so we instead distribute the logs over all of the shards
randomly for a more even
- // overall distribution by also hashing the timestamp.
- int shard = (serverName.hashCode() ^ Long.hashCode(timestamp)) % numShards;
- Path shardPath = new Path(url.getPath(), String.format(SHARD_DIR_FORMAT,
shard));
- // Ensure the shard directory exists. We track which shard directories we
have probed or
- // created to avoid a round trip to the namenode for repeats.
- IOException[] exception = new IOException[1];
- shardMap.computeIfAbsent(shardPath, p -> {
- try {
- if (!fs.exists(p)) {
- if (!fs.mkdirs(p)) {
- throw new IOException("Could not create path: " + p);
- }
- }
- } catch (IOException e) {
- exception[0] = e;
- }
- return p;
- });
- // If we faced an exception in computeIfAbsent, throw it
- if (exception[0] != null) {
- throw exception[0];
- }
- Path filePath = new Path(shardPath, String.format(FILE_NAME_FORMAT,
timestamp, serverName));
- return filePath;
- }
-
- /** Creates and initializes a new LogFileWriter for the given filesystem and
URL. */
- protected LogFileWriter createNewWriter(FileSystem fs, URI url) throws
IOException {
- Path filePath = makeWriterPath(fs, url);
- LogFileWriterContext writerContext = new
LogFileWriterContext(conf).setFileSystem(fs)
- .setFilePath(filePath).setCompression(compression);
- LogFileWriter newWriter = new LogFileWriter();
- try {
- newWriter.init(writerContext);
- newWriter.setGeneration(writerGeneration.incrementAndGet());
- } catch (IOException e) {
- LOG.error("Failed to initialize new LogFileWriter for path {}",
filePath, e);
- throw e;
- }
- return newWriter;
- }
-
/** Closes the given writer, logging any errors that occur during close. */
protected void closeWriter(LogFileWriter writer) {
if (writer == null) {
@@ -770,6 +454,14 @@ public class ReplicationLog {
}
}
+ /**
+ * Check if this ReplicationLogGroup is closed.
+ * @return true if closed, false otherwise
+ */
+ public boolean isClosed() {
+ return closed;
+ }
+
/**
* Force closes the log upon an unrecoverable internal error. This is a
fail-stop behavior: once
* called, the log is marked as closed, the Disruptor is halted, and all
subsequent append() and
@@ -779,10 +471,10 @@ public class ReplicationLog {
protected void closeOnError() {
lock.lock();
try {
- if (isClosed) {
+ if (closed) {
return;
}
- isClosed = true;
+ closed = true;
} finally {
lock.unlock();
}
@@ -799,10 +491,10 @@ public class ReplicationLog {
public void close() {
lock.lock();
try {
- if (isClosed) {
+ if (closed) {
return;
}
- isClosed = true;
+ closed = true;
} finally {
lock.unlock();
}
@@ -820,11 +512,15 @@ public class ReplicationLog {
closeWriter(currentWriter);
}
+ protected FileSystem getFileSystem(URI uri) throws IOException {
+ return FileSystem.get(uri, logGroup.getConfiguration());
+ }
+
/** Implements time based rotation independent of in-line checking. */
protected class LogRotationTask implements Runnable {
@Override
public void run() {
- if (isClosed) {
+ if (closed) {
return;
}
// Use tryLock with a timeout to avoid blocking indefinitely if another
thread holds
@@ -837,7 +533,7 @@ public class ReplicationLog {
// Check only the time condition here, size is handled by getWriter
long now = EnvironmentEdgeManager.currentTimeMillis();
long last = lastRotationTime.get();
- if (!isClosed && now - last >= rotationTimeMs) {
+ if (!closed && now - last >= rotationTimeMs) {
LOG.debug("Time based rotation needed ({} ms elapsed, threshold {}
ms).", now - last,
rotationTimeMs);
try {
@@ -903,10 +599,11 @@ public class ReplicationLog {
protected long generation;
protected LogEventHandler() {
- this.maxRetries =
- conf.getInt(REPLICATION_LOG_SYNC_RETRIES_KEY,
DEFAULT_REPLICATION_LOG_SYNC_RETRIES);
- this.retryDelayMs =
- conf.getLong(REPLICATION_LOG_RETRY_DELAY_MS_KEY,
DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
+ Configuration conf = logGroup.getConfiguration();
+ this.maxRetries =
conf.getInt(ReplicationLogGroup.REPLICATION_LOG_SYNC_RETRIES_KEY,
+ ReplicationLogGroup.DEFAULT_REPLICATION_LOG_SYNC_RETRIES);
+ this.retryDelayMs =
conf.getLong(ReplicationLogGroup.REPLICATION_LOG_RETRY_DELAY_MS_KEY,
+ ReplicationLogGroup.DEFAULT_REPLICATION_LOG_RETRY_DELAY_MS);
}
protected void init() throws IOException {
@@ -997,7 +694,7 @@ public class ReplicationLog {
// Calculate time spent in ring buffer
long currentTimeNs = System.nanoTime();
long ringBufferTimeNs = currentTimeNs - event.timestampNs;
- metrics.updateRingBufferTime(ringBufferTimeNs);
+ logGroup.getMetrics().updateRingBufferTime(ringBufferTimeNs);
writer = getWriter();
int attempt = 0;
while (attempt < maxRetries) {
@@ -1081,5 +778,4 @@ public class ReplicationLog {
closeOnError();
}
}
-
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StandbyLogGroupWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StandbyLogGroupWriter.java
new file mode 100644
index 0000000000..5c75088608
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StandbyLogGroupWriter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Synchronous replication implementation of ReplicationLogGroupWriter.
+ * <p>
+ * This class implements synchronous replication to a standby cluster's HDFS.
It writes replication
+ * logs directly to the standby cluster in synchronous mode, providing
immediate consistency for
+ * failover scenarios.
+ */
+public class StandbyLogGroupWriter extends ReplicationLogGroupWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StandbyLogGroupWriter.class);
+
+ private FileSystem standbyFs;
+ private URI standbyUrl;
+ protected int numShards;
+ protected final ConcurrentHashMap<Path, Object> shardMap = new
ConcurrentHashMap<>();
+
+ /**
+ * Constructor for StandbyLogGroupWriter.
+ */
+ public StandbyLogGroupWriter(ReplicationLogGroup logGroup) {
+ super(logGroup);
+ Configuration conf = logGroup.getConfiguration();
+ this.numShards =
conf.getInt(ReplicationLogGroup.REPLICATION_NUM_SHARDS_KEY,
+ ReplicationLogGroup.DEFAULT_REPLICATION_NUM_SHARDS);
+ LOG.debug("Created StandbyLogGroupWriter for HA Group: {}",
logGroup.getHaGroupName());
+ }
+
+ @Override
+ protected void initializeFileSystems() throws IOException {
+ if (numShards > ReplicationLogGroup.MAX_REPLICATION_NUM_SHARDS) {
+ throw new
IllegalArgumentException(ReplicationLogGroup.REPLICATION_NUM_SHARDS_KEY + " is "
+ + numShards + ", but the limit is " +
ReplicationLogGroup.MAX_REPLICATION_NUM_SHARDS);
+ }
+ Configuration conf = logGroup.getConfiguration();
+ String standbyUrlString =
conf.get(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY);
+ if (standbyUrlString == null || standbyUrlString.trim().isEmpty()) {
+ throw new IOException(
+ "Standby HDFS URL not configured: " +
ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY);
+ }
+ try {
+ standbyUrl = new URI(standbyUrlString);
+ standbyFs = getFileSystem(standbyUrl);
+ LOG.info("Initialized standby filesystem: {}", standbyUrl);
+ } catch (URISyntaxException e) {
+ throw new IOException("Invalid standby HDFS URL: " + standbyUrlString,
e);
+ }
+ }
+
+ /**
+ * Creates a new log file path in a sharded directory structure based on
server name and
+ * timestamp. The resulting path structure is
+ *
+ * <pre>
+ * [url]/[haGroupId]/[shard]/[timestamp]-[servername].plog
+ * </pre>
+ */
+ protected Path makeWriterPath(FileSystem fs, URI url) throws IOException {
+ Path haGroupPath = new Path(url.getPath(), logGroup.getHaGroupName());
+ long timestamp = EnvironmentEdgeManager.currentTimeMillis();
+ // To have all logs for a given regionserver appear in the same shard,
hash only the
+ // serverName. However we expect some regionservers will have
significantly more load than
+ // others so we instead distribute the logs over all of the shards
randomly for a more even
+ // overall distribution by also hashing the timestamp.
+ int shard =
+ Math.floorMod(logGroup.getServerName().hashCode() ^
Long.hashCode(timestamp), numShards);
+ Path shardPath =
+ new Path(haGroupPath,
String.format(ReplicationLogGroup.SHARD_DIR_FORMAT, shard));
+ // Ensure the shard directory exists. We track which shard directories we
have probed or
+ // created to avoid a round trip to the namenode for repeats.
+ IOException[] exception = new IOException[1];
+ shardMap.computeIfAbsent(shardPath, p -> {
+ try {
+ if (!fs.exists(p)) {
+ fs.mkdirs(haGroupPath); // This probably exists, but just in case.
+ if (!fs.mkdirs(shardPath)) {
+ throw new IOException("Could not create path: " + p);
+ }
+ }
+ } catch (IOException e) {
+ exception[0] = e;
+ return null; // Don't cache the path if we can't create it.
+ }
+ return p;
+ });
+ // If we faced an exception in computeIfAbsent, throw it
+ if (exception[0] != null) {
+ throw exception[0];
+ }
+ Path filePath = new Path(shardPath,
+ String.format(ReplicationLogGroup.FILE_NAME_FORMAT, timestamp,
logGroup.getServerName()));
+ return filePath;
+ }
+
+ /** Creates and initializes a new LogFileWriter. */
+ protected LogFileWriter createNewWriter() throws IOException {
+ Path filePath = makeWriterPath(standbyFs, standbyUrl);
+ LogFileWriterContext writerContext = new
LogFileWriterContext(logGroup.getConfiguration())
+
.setFileSystem(standbyFs).setFilePath(filePath).setCompression(compression);
+ LogFileWriter newWriter = new LogFileWriter();
+ newWriter.init(writerContext);
+ newWriter.setGeneration(writerGeneration.incrementAndGet());
+ return newWriter;
+ }
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardLogGroupWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardLogGroupWriter.java
new file mode 100644
index 0000000000..c491ff82d9
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardLogGroupWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.IOException;
+import org.apache.phoenix.replication.log.LogFileWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Store-and-forward replication implementation of ReplicationLogGroupWriter.
+ * <p>
+ * This class is a stub implementation for future store-and-forward
replication functionality.
+ * Store-and-forward mode is used when the standby cluster is temporarily
unavailable - mutations
+ * are stored locally and forwarded when connectivity is restored.
+ * <p>
+ * Currently this is a stub that throws UnsupportedOperationException for the
abstract methods.
+ * Future implementation will include:
+ * <ul>
+ * <li>Local storage of mutations when standby is unavailable</li>
+ * <li>Background forwarding when connectivity is restored</li>
+ * <li>Proper error handling and retry logic</li>
+ * <li>Integration with HA state management</li>
+ * <li>Dual-mode operation: local storage + forwarding</li>
+ * </ul>
+ */
+public class StoreAndForwardLogGroupWriter extends ReplicationLogGroupWriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StoreAndForwardLogGroupWriter.class);
+
+ /**
+ * Constructor for StoreAndForwardLogGroupWriter.
+ */
+ public StoreAndForwardLogGroupWriter(ReplicationLogGroup logGroup) {
+ super(logGroup);
+ LOG.debug("Created StoreAndForwardLogGroupWriter for HA Group: {}",
logGroup.getHaGroupName());
+ }
+
+ @Override
+ public void init() throws IOException {
+ // TODO
+ }
+
+ @Override
+ public void close() {
+ // TODO
+ }
+
+ @Override
+ protected void initializeFileSystems() throws IOException {
+ // TODO
+ }
+
+ @Override
+ protected LogFileWriter createNewWriter() throws IOException {
+ // TODO
+ return null;
+ }
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
similarity index 91%
rename from
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
rename to
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
index 907e3398fe..798c048136 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSource.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
@@ -19,12 +19,12 @@ package org.apache.phoenix.replication.metrics;
import org.apache.hadoop.hbase.metrics.BaseSource;
-/** Interface for metrics related to ReplicationLog operations. */
-public interface MetricsReplicationLogSource extends BaseSource {
+/** Interface for metrics related to ReplicationLogGroup operations. */
+public interface MetricsReplicationLogGroupSource extends BaseSource {
- String METRICS_NAME = "ReplicationLog";
+ String METRICS_NAME = "ReplicationLogGroup";
String METRICS_CONTEXT = "phoenix";
- String METRICS_DESCRIPTION = "Metrics about Phoenix Replication Log
Operations";
+ String METRICS_DESCRIPTION = "Metrics about Replication Log Operations for
an HA Group";
String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
String TIME_BASED_ROTATION_COUNT = "timeBasedRotationCount";
@@ -108,7 +108,11 @@ public interface MetricsReplicationLogSource extends
BaseSource {
*/
void incrementRotationFailureCount();
+ /**
+ * Unregister this metrics source.
+ */
+ void close();
+
// Get current values for testing
ReplicationLogMetricValues getCurrentMetricValues();
-
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
similarity index 86%
rename from
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
rename to
phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
index fcb08efde9..7b3bb5789c 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogSourceImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
@@ -18,12 +18,13 @@
package org.apache.phoenix.replication.metrics;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.hadoop.metrics2.lib.MutableHistogram;
/** Implementation of metrics source for ReplicationLog operations. */
-public class MetricsReplicationLogSourceImpl extends BaseSourceImpl
- implements MetricsReplicationLogSource {
+public class MetricsReplicationLogGroupSourceImpl extends BaseSourceImpl
+ implements MetricsReplicationLogGroupSource {
private final MutableFastCounter timeBasedRotationCount;
private final MutableFastCounter sizeBasedRotationCount;
@@ -35,13 +36,14 @@ public class MetricsReplicationLogSourceImpl extends
BaseSourceImpl
private final MutableHistogram rotationTime;
private final MutableHistogram ringBufferTime;
- public MetricsReplicationLogSourceImpl() {
- this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT,
METRICS_JMX_CONTEXT);
+ public MetricsReplicationLogGroupSourceImpl(String haGroupName) {
+ this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT,
METRICS_JMX_CONTEXT, haGroupName);
}
- public MetricsReplicationLogSourceImpl(String metricsName, String
metricsDescription,
- String metricsContext, String metricsJmxContext) {
- super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+ public MetricsReplicationLogGroupSourceImpl(String metricsName, String
metricsDescription,
+ String metricsContext, String metricsJmxContext, String haGroupName) {
+ super(metricsName, metricsDescription, metricsContext,
+ metricsJmxContext + ",haGroup=" + haGroupName);
timeBasedRotationCount =
getMetricsRegistry().newCounter(TIME_BASED_ROTATION_COUNT,
TIME_BASED_ROTATION_COUNT_DESC, 0L);
sizeBasedRotationCount =
getMetricsRegistry().newCounter(SIZE_BASED_ROTATION_COUNT,
@@ -57,6 +59,11 @@ public class MetricsReplicationLogSourceImpl extends
BaseSourceImpl
ringBufferTime = getMetricsRegistry().newHistogram(RING_BUFFER_TIME,
RING_BUFFER_TIME_DESC);
}
+ @Override
+ public void close() {
+ DefaultMetricsSystem.instance().unregisterSource(metricsJmxContext);
+ }
+
@Override
public void incrementTimeBasedRotationCount() {
timeBasedRotationCount.incr();
@@ -124,10 +131,4 @@ public class MetricsReplicationLogSourceImpl extends
BaseSourceImpl
public String getMetricsContext() {
return METRICS_CONTEXT;
}
-
- @Override
- public String getMetricsJmxContext() {
- return METRICS_JMX_CONTEXT;
- }
-
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
similarity index 73%
rename from
phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
rename to
phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index ced1527400..e6e5818430 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -43,25 +43,20 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.phoenix.replication.ReplicationLog.RotationReason;
+import org.apache.phoenix.replication.ReplicationLogGroupWriter.RotationReason;
import org.apache.phoenix.replication.log.LogFile;
import org.apache.phoenix.replication.log.LogFileReader;
import org.apache.phoenix.replication.log.LogFileReaderContext;
import org.apache.phoenix.replication.log.LogFileTestUtil;
import org.apache.phoenix.replication.log.LogFileWriter;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogSource;
-import org.apache.phoenix.replication.metrics.MetricsReplicationLogSourceImpl;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.Before;
@@ -75,9 +70,9 @@ import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ReplicationLogTest {
+public class ReplicationLogGroupTest {
- private static final Logger LOG =
LoggerFactory.getLogger(ReplicationLogTest.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicationLogGroupTest.class);
@ClassRule
public static TemporaryFolder testFolder = new TemporaryFolder();
@@ -86,7 +81,7 @@ public class ReplicationLogTest {
private ServerName serverName;
private FileSystem localFs;
private URI standbyUri;
- private ReplicationLog logWriter;
+ private ReplicationLogGroup logGroup;
static final int TEST_RINGBUFFER_SIZE = 32;
static final int TEST_SYNC_TIMEOUT = 1000;
@@ -99,29 +94,26 @@ public class ReplicationLogTest {
localFs = FileSystem.getLocal(conf);
standbyUri = new Path(testFolder.toString()).toUri();
serverName = ServerName.valueOf("test", 60010,
EnvironmentEdgeManager.currentTimeMillis());
- conf.set(ReplicationLog.REPLICATION_STANDBY_HDFS_URL_KEY,
standbyUri.toString());
+ conf.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY,
standbyUri.toString());
// Small ring buffer size for testing
- conf.setInt(ReplicationLog.REPLICATION_LOG_RINGBUFFER_SIZE_KEY,
TEST_RINGBUFFER_SIZE);
+ conf.setInt(ReplicationLogGroup.REPLICATION_LOG_RINGBUFFER_SIZE_KEY,
TEST_RINGBUFFER_SIZE);
// Set a short sync timeout for testing
- conf.setLong(ReplicationLog.REPLICATION_LOG_SYNC_TIMEOUT_KEY,
TEST_SYNC_TIMEOUT);
+ conf.setLong(ReplicationLogGroup.REPLICATION_LOG_SYNC_TIMEOUT_KEY,
TEST_SYNC_TIMEOUT);
// Set rotation time to 10 seconds
- conf.setLong(ReplicationLog.REPLICATION_LOG_ROTATION_TIME_MS_KEY,
TEST_ROTATION_TIME);
+ conf.setLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_TIME_MS_KEY,
TEST_ROTATION_TIME);
// Small size threshold for testing
- conf.setLong(ReplicationLog.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
TEST_ROTATION_SIZE_BYTES);
+ conf.setLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY,
+ TEST_ROTATION_SIZE_BYTES);
- logWriter = spy(new TestableReplicationLog(conf, serverName));
- logWriter.init();
+ logGroup = new TestableLogGroup(conf, serverName, "testHAGroup");
+ logGroup.init();
}
@After
public void tearDown() throws Exception {
- if (logWriter != null) {
- logWriter.close();
+ if (logGroup != null) {
+ logGroup.close();
}
- // Deregister the metrics source that the replication log registers during
initialization
- // so the next unit will be able to register it again and successfully
initialize.
- DefaultMetricsSystem.instance()
- .unregisterSource(MetricsReplicationLogSource.METRICS_JMX_CONTEXT);
}
/**
@@ -143,17 +135,17 @@ public class ReplicationLogTest {
final Mutation put5 = LogFileTestUtil.newPut("row5", 5, 1);
// Get the inner writer
- LogFileWriter writer = logWriter.getWriter();
+ LogFileWriter writer = logGroup.getActiveWriter().getWriter();
assertNotNull("Writer should not be null", writer);
InOrder inOrder = Mockito.inOrder(writer);
- logWriter.append(tableName, commitId1, put1);
- logWriter.append(tableName, commitId2, put2);
- logWriter.append(tableName, commitId3, put3);
- logWriter.append(tableName, commitId4, put4);
- logWriter.append(tableName, commitId5, put5);
+ logGroup.append(tableName, commitId1, put1);
+ logGroup.append(tableName, commitId2, put2);
+ logGroup.append(tableName, commitId3, put3);
+ logGroup.append(tableName, commitId4, put4);
+ logGroup.append(tableName, commitId5, put5);
- logWriter.sync();
+ logGroup.sync();
// Happens-before ordering verification, using Mockito's inOrder. Verify
that the appends
// happen before sync, and sync happened after appends.
@@ -165,40 +157,6 @@ public class ReplicationLogTest {
inOrder.verify(writer, times(1)).sync();
}
- /**
- * Tests the behavior when an append operation fails. Verifies that the
system properly handles
- * append failures by rolling to a new writer and retrying the operation.
- */
- @Test
- public void testAppendFailureAndRetry() throws Exception {
- final String tableName = "TBLAFR";
- final long commitId = 1L;
- final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
-
- // Get the inner writer
- LogFileWriter writerBeforeRoll = logWriter.getWriter();
- assertNotNull("Initial writer should not be null", writerBeforeRoll);
-
- // Configure writerBeforeRoll to fail on the first append call
- doThrow(new IOException("Simulated append
failure")).when(writerBeforeRoll).append(anyString(),
- anyLong(), any(Mutation.class));
-
- // Append data
- logWriter.append(tableName, commitId, put);
- logWriter.sync();
-
- // Get the inner writer we rolled to.
- LogFileWriter writerAfterRoll = logWriter.getWriter();
- assertNotNull("Rolled writer should not be null", writerAfterRoll);
-
- // Verify the sequence: append (fail), rotate, append (succeed), sync
- InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
- inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName),
eq(commitId), eq(put));
- inOrder.verify(writerBeforeRoll, times(0)).sync(); // We failed append,
did not try
- inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName),
eq(commitId), eq(put)); // Retry
- inOrder.verify(writerAfterRoll, times(1)).sync();
- }
-
/**
* Tests the behavior when a sync operation fails. Verifies that the system
properly handles sync
* failures by rolling to a new writer and retrying the operation.
@@ -210,18 +168,18 @@ public class ReplicationLogTest {
final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
// Get the inner writer
- LogFileWriter writerBeforeRoll = logWriter.getWriter();
+ LogFileWriter writerBeforeRoll = logGroup.getActiveWriter().getWriter();
assertNotNull("Initial writer should not be null", writerBeforeRoll);
// Configure writerBeforeRoll to fail on the first sync call
doThrow(new IOException("Simulated sync
failure")).when(writerBeforeRoll).sync();
// Append data
- logWriter.append(tableName, commitId, put);
- logWriter.sync();
+ logGroup.append(tableName, commitId, put);
+ logGroup.sync();
// Get the inner writer we rolled to.
- LogFileWriter writerAfterRoll = logWriter.getWriter();
+ LogFileWriter writerAfterRoll = logGroup.getActiveWriter().getWriter();
assertNotNull("Initial writer should not be null", writerBeforeRoll);
// Verify the sequence: append, sync (fail), rotate, append (retry), sync
(succeed)
@@ -243,7 +201,7 @@ public class ReplicationLogTest {
long commitId = 0;
// Get the inner writer
- LogFileWriter innerWriter = logWriter.getWriter();
+ LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
assertNotNull("Inner writer should not be null", innerWriter);
// Create a slow consumer to fill up the ring buffer.
@@ -257,7 +215,7 @@ public class ReplicationLogTest {
// Fill up the ring buffer by sending enough events.
for (int i = 0; i < TEST_RINGBUFFER_SIZE; i++) {
- logWriter.append(tableName, commitId++, put);
+ logGroup.append(tableName, commitId++, put);
}
// Now try to append when the ring is full. This should block until space
becomes
@@ -268,7 +226,7 @@ public class ReplicationLogTest {
Thread appendThread = new Thread(() -> {
try {
startFuture.complete(null);
- logWriter.append(tableName, myCommitId, put);
+ logGroup.append(tableName, myCommitId, put);
appendFuture.complete(null);
} catch (IOException e) {
appendFuture.completeExceptionally(e);
@@ -293,6 +251,40 @@ public class ReplicationLogTest {
verify(innerWriter, timeout(10000).times(1)).append(eq(tableName),
eq(myCommitId), any());
}
+ /**
+ * Tests the behavior when an append operation fails. Verifies that the
system properly handles
+ * append failures by rolling to a new writer and retrying the operation.
+ */
+ @Test
+ public void testAppendFailureAndRetry() throws Exception {
+ final String tableName = "TBLAFR";
+ final long commitId = 1L;
+ final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+ // Get the inner writer
+ LogFileWriter writerBeforeRoll = logGroup.getActiveWriter().getWriter();
+ assertNotNull("Initial writer should not be null", writerBeforeRoll);
+
+ // Configure writerBeforeRoll to fail on the first append call
+ doThrow(new IOException("Simulated append
failure")).when(writerBeforeRoll).append(anyString(),
+ anyLong(), any(Mutation.class));
+
+ // Append data
+ logGroup.append(tableName, commitId, put);
+ logGroup.sync();
+
+ // Get the inner writer we rolled to.
+ LogFileWriter writerAfterRoll = logGroup.getActiveWriter().getWriter();
+ assertNotNull("Rolled writer should not be null", writerAfterRoll);
+
+ // Verify the sequence: append (fail), rotate, append (succeed), sync
+ InOrder inOrder = Mockito.inOrder(writerBeforeRoll, writerAfterRoll);
+ inOrder.verify(writerBeforeRoll, times(1)).append(eq(tableName),
eq(commitId), eq(put));
+ inOrder.verify(writerBeforeRoll, times(0)).sync(); // We failed append,
did not try
+ inOrder.verify(writerAfterRoll, times(1)).append(eq(tableName),
eq(commitId), eq(put)); // Retry
+ inOrder.verify(writerAfterRoll, times(1)).sync();
+ }
+
/**
* Tests the sync timeout behavior. Verifies that sync operations time out
after the configured
* interval if they cannot complete.
@@ -304,7 +296,7 @@ public class ReplicationLogTest {
final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
// Get the inner writer
- LogFileWriter innerWriter = logWriter.getWriter();
+ LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
assertNotNull("Inner writer should not be null", innerWriter);
doAnswer(new Answer<Object>() {
@@ -317,11 +309,11 @@ public class ReplicationLogTest {
}).when(innerWriter).sync();
// Append some data
- logWriter.append(tableName, commitId, put);
+ logGroup.append(tableName, commitId, put);
// Try to sync and expect it to timeout
try {
- logWriter.sync();
+ logGroup.sync();
fail("Expected sync to timeout");
} catch (IOException e) {
assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
@@ -342,7 +334,7 @@ public class ReplicationLogTest {
final CountDownLatch completionLatch = new CountDownLatch(2);
// Get the inner writer
- LogFileWriter innerWriter = logWriter.getWriter();
+ LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
assertNotNull("Inner writer should not be null", innerWriter);
// Thread 1: Append mutations with even commit IDs
@@ -352,7 +344,7 @@ public class ReplicationLogTest {
for (int i = 0; i < APPENDS_PER_THREAD; i++) {
final long commitId = i * 2;
final Mutation put = LogFileTestUtil.newPut("row" + commitId,
commitId, 1);
- logWriter.append(tableName, commitId, put);
+ logGroup.append(tableName, commitId, put);
}
} catch (Exception e) {
fail("Producer 1 failed: " + e.getMessage());
@@ -368,7 +360,7 @@ public class ReplicationLogTest {
for (int i = 0; i < APPENDS_PER_THREAD; i++) {
final long commitId = i * 2 + 1;
final Mutation put = LogFileTestUtil.newPut("row" + commitId,
commitId, 1);
- logWriter.append(tableName, commitId, put);
+ logGroup.append(tableName, commitId, put);
}
} catch (Exception e) {
fail("Producer 2 failed: " + e.getMessage());
@@ -387,7 +379,7 @@ public class ReplicationLogTest {
// Perform a sync to ensure all appends are processed.
InOrder inOrder = Mockito.inOrder(innerWriter); // To verify the below
sync.
- logWriter.sync();
+ logGroup.sync();
// Verify the final sync was called.
inOrder.verify(innerWriter, times(1)).sync();
@@ -396,7 +388,6 @@ public class ReplicationLogTest {
final long commitId = i;
verify(innerWriter, times(1)).append(eq(tableName), eq(commitId), any());
}
-
}
/**
@@ -410,22 +401,22 @@ public class ReplicationLogTest {
final long commitId = 1L;
// Get the initial writer
- LogFileWriter writerBeforeRotation = logWriter.getWriter();
+ LogFileWriter writerBeforeRotation =
logGroup.getActiveWriter().getWriter();
assertNotNull("Initial writer should not be null", writerBeforeRotation);
// Append some data
- logWriter.append(tableName, commitId, put);
- logWriter.sync();
+ logGroup.append(tableName, commitId, put);
+ logGroup.sync();
// Wait for rotation time to elapse
Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
// Append more data to trigger rotation check
- logWriter.append(tableName, commitId + 1, put);
- logWriter.sync();
+ logGroup.append(tableName, commitId + 1, put);
+ logGroup.sync();
// Get the new writer after rotation
- LogFileWriter writerAfterRotation = logWriter.getWriter();
+ LogFileWriter writerAfterRotation = logGroup.getActiveWriter().getWriter();
assertNotNull("New writer should not be null", writerAfterRotation);
assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
@@ -460,23 +451,23 @@ public class ReplicationLogTest {
final Mutation put = LogFileTestUtil.newPut("row", 1, 10);
long commitId = 1L;
- LogFileWriter writerBeforeRotation = logWriter.getWriter();
+ LogFileWriter writerBeforeRotation =
logGroup.getActiveWriter().getWriter();
assertNotNull("Initial writer should not be null", writerBeforeRotation);
// Append enough data so that we exceed the size threshold.
for (int i = 0; i < 100; i++) {
- logWriter.append(tableName, commitId++, put);
+ logGroup.append(tableName, commitId++, put);
}
- logWriter.sync(); // Should trigger a sized based rotation
+ logGroup.sync(); // Should trigger a sized based rotation
// Get the new writer after the expected rotation.
- LogFileWriter writerAfterRotation = logWriter.getWriter();
+ LogFileWriter writerAfterRotation = logGroup.getActiveWriter().getWriter();
assertNotNull("New writer should not be null", writerAfterRotation);
assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
// Append one more mutation to verify we're using the new writer.
- logWriter.append(tableName, commitId, put);
- logWriter.sync();
+ logGroup.append(tableName, commitId, put);
+ logGroup.sync();
// Verify the sequence of operations
InOrder inOrder = Mockito.inOrder(writerBeforeRotation,
writerAfterRotation);
@@ -501,21 +492,21 @@ public class ReplicationLogTest {
final long commitId = 1L;
// Get the inner writer
- LogFileWriter innerWriter = logWriter.getWriter();
+ LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
assertNotNull("Inner writer should not be null", innerWriter);
// Append some data
- logWriter.append(tableName, commitId, put);
+ logGroup.append(tableName, commitId, put);
// Close the log writer
- logWriter.close();
+ logGroup.close();
// Verify the inner writer was closed
verify(innerWriter, times(1)).close();
// Verify we can't append after close
try {
- logWriter.append(tableName, commitId + 1, put);
+ logGroup.append(tableName, commitId + 1, put);
fail("Expected append to fail after close");
} catch (IOException e) {
// Expected
@@ -523,14 +514,14 @@ public class ReplicationLogTest {
// Verify we can't sync after close
try {
- logWriter.sync();
+ logGroup.sync();
fail("Expected sync to fail after close");
} catch (IOException e) {
// Expected
}
// Verify we can close multiple times without error
- logWriter.close();
+ logGroup.close();
}
/**
@@ -543,16 +534,16 @@ public class ReplicationLogTest {
final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
long commitId = 1L;
- LogFileWriter writerBeforeRotation = logWriter.getWriter();
+ LogFileWriter writerBeforeRotation =
logGroup.getActiveWriter().getWriter();
assertNotNull("Initial writer should not be null", writerBeforeRotation);
// Append some data and wait for the rotation time to elapse plus a small
buffer.
- logWriter.append(tableName, commitId, put);
- logWriter.sync();
+ logGroup.append(tableName, commitId, put);
+ logGroup.sync();
Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
// Get the new writer after the rotation.
- LogFileWriter writerAfterRotation = logWriter.getWriter();
+ LogFileWriter writerAfterRotation = logGroup.getActiveWriter().getWriter();
assertNotNull("New writer should not be null", writerAfterRotation);
assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
@@ -584,8 +575,10 @@ public class ReplicationLogTest {
final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
long commitId = 1L;
+ ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
// Get the initial writer
- LogFileWriter initialWriter = logWriter.getWriter();
+ LogFileWriter initialWriter = logGroupWriter.getWriter();
assertNotNull("Initial writer should not be null", initialWriter);
// Configure the log writer to fail only the first time when creating new
writers.
@@ -595,27 +588,27 @@ public class ReplicationLogTest {
throw new IOException("Simulated failure to create new writer");
}
return invocation.callRealMethod();
- }).when(logWriter).createNewWriter(any(FileSystem.class), any(URI.class));
+ }).when(logGroupWriter).createNewWriter();
// Append some data
- logWriter.append(tableName, commitId, put);
- logWriter.sync();
+ logGroup.append(tableName, commitId, put);
+ logGroup.sync();
// Rotate the log.
- LogFileWriter writerAfterFailedRotate =
logWriter.rotateLog(RotationReason.TIME);
+ LogFileWriter writerAfterFailedRotate =
logGroupWriter.rotateLog(RotationReason.TIME);
assertEquals("Should still be using the initial writer", initialWriter,
writerAfterFailedRotate);
// While rotation is failing, verify we can continue to use the current
writer.
- logWriter.append(tableName, commitId + 1, put);
- logWriter.sync();
+ logGroup.append(tableName, commitId + 1, put);
+ logGroup.sync();
- LogFileWriter writerAfterRotate = logWriter.rotateLog(RotationReason.TIME);
+ LogFileWriter writerAfterRotate =
logGroupWriter.rotateLog(RotationReason.TIME);
assertNotEquals("Should be using a new writer", initialWriter,
writerAfterRotate);
// Try to append more data. This should work with the new writer after
successful rotation.
- logWriter.append(tableName, commitId + 2, put);
- logWriter.sync();
+ logGroup.append(tableName, commitId + 2, put);
+ logGroup.sync();
// Verify operations went to the writers in the correct order
InOrder inOrder = Mockito.inOrder(initialWriter, writerAfterRotate);
@@ -640,23 +633,26 @@ public class ReplicationLogTest {
final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
long commitId = 1L;
- LogFileWriter initialWriter = logWriter.getWriter();
+ ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
+ // Get the initial writer
+ LogFileWriter initialWriter = logGroupWriter.getWriter();
assertNotNull("Initial writer should not be null", initialWriter);
// Configure the log writer to always fail when creating new writers
- doThrow(new IOException("Simulated failure to create new
writer")).when(logWriter)
- .createNewWriter(any(FileSystem.class), any(URI.class));
+ doThrow(new IOException("Simulated failure to create new
writer")).when(logGroupWriter)
+ .createNewWriter();
// Append some data
- logWriter.append(tableName, commitId, put);
- logWriter.sync();
+ logGroup.append(tableName, commitId, put);
+ logGroup.sync();
// Try to rotate the log multiple times until we exceed the retry limit
- for (int i = 0; i <=
ReplicationLog.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES; i++) {
+ for (int i = 0; i <=
ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES; i++) {
try {
- logWriter.rotateLog(RotationReason.TIME);
+ logGroupWriter.rotateLog(RotationReason.TIME);
} catch (IOException e) {
- if (i < ReplicationLog.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES) {
+ if (i < ReplicationLogGroup.DEFAULT_REPLICATION_LOG_ROTATION_RETRIES) {
// Not the last attempt yet, continue
continue;
}
@@ -670,8 +666,8 @@ public class ReplicationLogTest {
// Verify subsequent operations fail because the log is closed
try {
- logWriter.append(tableName, commitId + 1, put);
- logWriter.sync();
+ logGroup.append(tableName, commitId + 1, put);
+ logGroup.sync();
fail("Expected append to fail because log is closed");
} catch (IOException e) {
assertTrue("Expected an IOException because log is closed",
@@ -690,7 +686,7 @@ public class ReplicationLogTest {
final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
// Get the inner writer
- LogFileWriter innerWriter = logWriter.getWriter();
+ LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
assertNotNull("Writer should not be null", innerWriter);
// Configure writer to throw a RuntimeException on append
@@ -698,9 +694,9 @@ public class ReplicationLogTest {
anyLong(), any(Mutation.class));
// Append data. This should trigger the LogExceptionHandler, which will
close logWriter.
- logWriter.append(tableName, commitId, put);
+ logGroup.append(tableName, commitId, put);
try {
- logWriter.sync();
+ logGroup.sync();
fail("Should have thrown IOException because sync timed out");
} catch (IOException e) {
assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
@@ -708,7 +704,7 @@ public class ReplicationLogTest {
// Verify that subsequent operations fail because the log is closed
try {
- logWriter.append(tableName, commitId + 1, put);
+ logGroup.append(tableName, commitId + 1, put);
fail("Should have thrown IOException because log is closed");
} catch (IOException e) {
assertTrue("Expected an IOException because log is closed",
@@ -729,30 +725,31 @@ public class ReplicationLogTest {
final long commitId = 1L;
final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+ ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
// Get the initial writer
- LogFileWriter initialWriter = logWriter.getWriter();
+ LogFileWriter initialWriter = logGroupWriter.getWriter();
assertNotNull("Initial writer should not be null", initialWriter);
// Configure initial writer to fail on sync
doThrow(new IOException("Simulated sync
failure")).when(initialWriter).sync();
// createNewWriter should keep returning the bad writer
- doAnswer(invocation ->
initialWriter).when(logWriter).createNewWriter(any(FileSystem.class),
- any(URI.class));
+ doAnswer(invocation ->
initialWriter).when(logGroupWriter).createNewWriter();
// Append data
- logWriter.append(tableName, commitId, put);
+ logGroup.append(tableName, commitId, put);
// Try to sync. Should fail after exhausting retries.
try {
- logWriter.sync();
+ logGroup.sync();
fail("Expected sync to fail after exhausting retries");
} catch (IOException e) {
assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
}
// Each retry creates a new writer, so that is at least 1 create + 5
retries.
- verify(logWriter, atLeast(6)).createNewWriter(any(FileSystem.class),
any(URI.class));
+ verify(logGroupWriter, atLeast(6)).createNewWriter();
}
/**
@@ -766,24 +763,24 @@ public class ReplicationLogTest {
long commitId = 1L;
// Get the initial writer
- LogFileWriter writerBeforeRotation = logWriter.getWriter();
+ LogFileWriter writerBeforeRotation =
logGroup.getActiveWriter().getWriter();
assertNotNull("Initial writer should not be null", writerBeforeRotation);
// Append several items to fill currentBatch but don't sync yet
for (int i = 0; i < 5; i++) {
- logWriter.append(tableName, commitId + i, put);
+ logGroup.append(tableName, commitId + i, put);
}
// Force a rotation by waiting for rotation time to elapse
Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
// Get the new writer after rotation
- LogFileWriter writerAfterRotation = logWriter.getWriter();
+ LogFileWriter writerAfterRotation = logGroup.getActiveWriter().getWriter();
assertNotNull("New writer should not be null", writerAfterRotation);
assertTrue("Writer should have been rotated", writerAfterRotation !=
writerBeforeRotation);
// Now trigger a sync which should replay the currentBatch to the new
writer
- logWriter.sync();
+ logGroup.sync();
// Verify the sequence of operations
InOrder inOrder = Mockito.inOrder(writerBeforeRotation,
writerAfterRotation);
@@ -817,18 +814,20 @@ public class ReplicationLogTest {
final int NUM_RECORDS = 100;
List<LogFile.Record> originalRecords = new ArrayList<>();
+ ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
// Get the path of the log file.
- Path logPath = logWriter.getWriter().getContext().getFilePath();
+ Path logPath = logGroupWriter.getWriter().getContext().getFilePath();
for (int i = 0; i < NUM_RECORDS; i++) {
LogFile.Record record = LogFileTestUtil.newPutRecord(tableName, i, "row"
+ i, i, 1);
originalRecords.add(record);
- logWriter.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
+ logGroup.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
}
- logWriter.sync(); // Sync to commit the appends to the current writer.
+ logGroup.sync(); // Sync to commit the appends to the current writer.
// Force a rotation to close the current writer.
- logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+ logGroupWriter.rotateLog(RotationReason.SIZE);
assertTrue("Log file should exist", localFs.exists(logPath));
@@ -869,10 +868,12 @@ public class ReplicationLogTest {
List<LogFile.Record> originalRecords = new ArrayList<>();
List<Path> logPaths = new ArrayList<>();
+ ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
// Write records across multiple rotations.
for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
// Get the path of the current log file.
- Path logPath = logWriter.getWriter().getContext().getFilePath();
+ Path logPath = logGroupWriter.getWriter().getContext().getFilePath();
logPaths.add(logPath);
for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
@@ -880,11 +881,11 @@ public class ReplicationLogTest {
LogFile.Record record =
LogFileTestUtil.newPutRecord(tableName, commitId, "row" + commitId,
commitId, 1);
originalRecords.add(record);
- logWriter.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
+ logGroup.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
}
- logWriter.sync(); // Sync to commit the appends to the current writer.
+ logGroup.sync(); // Sync to commit the appends to the current writer.
// Force a rotation to close the current writer.
- logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+ logGroupWriter.rotateLog(RotationReason.SIZE);
}
// Verify all log files exist
@@ -934,10 +935,12 @@ public class ReplicationLogTest {
List<LogFile.Record> originalRecords = new ArrayList<>();
List<Path> logPaths = new ArrayList<>();
+ ReplicationLogGroupWriter logGroupWriter = logGroup.getActiveWriter();
+
// Write records across multiple rotations, only syncing 50% of the time.
for (int rotation = 0; rotation < NUM_ROTATIONS; rotation++) {
// Get the path of the current log file.
- Path logPath = logWriter.getWriter().getContext().getFilePath();
+ Path logPath = logGroupWriter.getWriter().getContext().getFilePath();
logPaths.add(logPath);
for (int i = 0; i < NUM_RECORDS_PER_ROTATION; i++) {
@@ -945,16 +948,16 @@ public class ReplicationLogTest {
LogFile.Record record =
LogFileTestUtil.newPutRecord(tableName, commitId, "row" + commitId,
commitId, 1);
originalRecords.add(record);
- logWriter.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
+ logGroup.append(record.getHBaseTableName(), record.getCommitId(),
record.getMutation());
}
// Only sync 50% of the time before rotation. To ensure we sync on the
last file
// we are going to write, use 'rotation % 2 == 1' instead of 'rotation %
2 == 0'.
if (rotation % 2 == 1) {
- logWriter.sync(); // Sync to commit the appends to the current writer.
+ logGroup.sync(); // Sync to commit the appends to the current writer.
}
// Force a rotation to close the current writer.
- logWriter.rotateLog(ReplicationLog.RotationReason.SIZE);
+ logGroupWriter.rotateLog(RotationReason.SIZE);
}
// Verify all log files exist
@@ -999,17 +1002,17 @@ public class ReplicationLogTest {
final long commitId = 1L;
final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
- // Get the inner writer
- LogFileWriter innerWriter = logWriter.getWriter();
- assertNotNull("Writer should not be null", innerWriter);
+ // Get the initial writer
+ LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
+ assertNotNull("Inner writer should not be null", innerWriter);
// Configure writer to throw RuntimeException on getLength()
doThrow(new RuntimeException("Simulated critical
error")).when(innerWriter).getLength();
// Append data. This should trigger the LogExceptionHandler, which will
close logWriter.
- logWriter.append(tableName, commitId, put);
+ logGroup.append(tableName, commitId, put);
try {
- logWriter.sync();
+ logGroup.sync();
fail("Should have thrown IOException because sync timed out");
} catch (IOException e) {
assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
@@ -1017,7 +1020,7 @@ public class ReplicationLogTest {
// Verify that subsequent operations fail because the log is closed
try {
- logWriter.append(tableName, commitId + 1, put);
+ logGroup.append(tableName, commitId + 1, put);
fail("Should have thrown IOException because log is closed");
} catch (IOException e) {
assertTrue("Expected an IOException because log is closed",
@@ -1039,7 +1042,7 @@ public class ReplicationLogTest {
final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
// Get the inner writer
- LogFileWriter innerWriter = logWriter.getWriter();
+ LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
assertNotNull("Writer should not be null", innerWriter);
// Configure writer to throw RuntimeException on append
@@ -1047,9 +1050,9 @@ public class ReplicationLogTest {
anyLong(), any(Mutation.class));
// Append data to trigger closeOnError()
- logWriter.append(tableName, commitId, put);
+ logGroup.append(tableName, commitId, put);
try {
- logWriter.sync();
+ logGroup.sync();
fail("Should have thrown IOException because sync timed out");
} catch (IOException e) {
assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
@@ -1057,7 +1060,7 @@ public class ReplicationLogTest {
// Verify that subsequent append operations fail because the log is closed
try {
- logWriter.append(tableName, commitId, put);
+ logGroup.append(tableName, commitId, put);
fail("Should have thrown IOException because log is closed");
} catch (IOException e) {
assertTrue("Expected an IOException because log is closed",
@@ -1079,7 +1082,7 @@ public class ReplicationLogTest {
final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
// Get the inner writer
- LogFileWriter innerWriter = logWriter.getWriter();
+ LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
assertNotNull("Writer should not be null", innerWriter);
// Configure writer to throw RuntimeException on append
@@ -1087,9 +1090,9 @@ public class ReplicationLogTest {
anyLong(), any(Mutation.class));
// Append data to trigger closeOnError()
- logWriter.append(tableName, commitId, put);
+ logGroup.append(tableName, commitId, put);
try {
- logWriter.sync();
+ logGroup.sync();
fail("Should have thrown IOException because sync timed out");
} catch (IOException e) {
assertTrue("Expected timeout exception", e.getCause() instanceof
TimeoutException);
@@ -1097,7 +1100,7 @@ public class ReplicationLogTest {
// Verify that subsequent sync operations fail because the log is closed
try {
- logWriter.sync();
+ logGroup.sync();
fail("Should have thrown IOException because log is closed");
} catch (IOException e) {
assertTrue("Expected an IOException because log is closed",
@@ -1108,175 +1111,6 @@ public class ReplicationLogTest {
verify(innerWriter, times(1)).close();
}
- /**
- * Tests race condition between LogRotationTask and LogEventHandler when
both try to rotate the
- * writer simultaneously. Verifies that despite concurrent rotation
attempts, the log is only
- * rotated once. Uses latches to ensure true concurrency and verify the
sequence of operations.
- */
- @Test
- public void testConcurrentRotationAttempts() throws Exception {
- final String tableName = "TBLCR";
- final long commitId = 1L;
- final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
-
- // Get the initial writer
- LogFileWriter initialWriter = logWriter.getWriter();
- assertNotNull("Initial writer should not be null", initialWriter);
-
- // Create latches to control timing and track rotation attempts
- final CountDownLatch rotationTaskStarted = new CountDownLatch(1);
- final CountDownLatch rotationTaskCanProceed = new CountDownLatch(1);
- final CountDownLatch eventHandlerStarted = new CountDownLatch(1);
- final CountDownLatch eventHandlerCanProceed = new CountDownLatch(1);
- final AtomicInteger rotationCount = new AtomicInteger(0);
- final CountDownLatch bothAttemptsStarted = new CountDownLatch(2);
-
- // Configure the rotation task to pause at specific points and track
attempts
- doAnswer(invocation -> {
- rotationTaskStarted.countDown(); // Signal that rotation task has started
- bothAttemptsStarted.countDown(); // Signal that this attempt has started
- rotationTaskCanProceed.await(); // Wait for permission to proceed
- rotationCount.incrementAndGet(); // Track this rotation attempt
- return invocation.callRealMethod();
- }).when(logWriter).rotateLog(ReplicationLog.RotationReason.TIME);
-
- // Configure the event handler to pause at specific points
- doAnswer(invocation -> {
- eventHandlerStarted.countDown(); // Signal that event handler has started
- bothAttemptsStarted.countDown(); // Signal that this attempt has started
- eventHandlerCanProceed.await(); // Wait for permission to proceed
- return invocation.callRealMethod();
- }).when(logWriter).getWriter();
-
- // Start a thread that will trigger rotation via the background task
- Thread rotationThread = new Thread(() -> {
- try {
- // Force rotation by waiting for rotation time
- Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- });
- rotationThread.start();
-
- // Start append operation in main thread
- logWriter.append(tableName, commitId, put);
-
- // Wait for both attempts to start - this ensures true concurrency
- assertTrue("Both rotation attempts should start",
- bothAttemptsStarted.await(5, TimeUnit.SECONDS));
-
- // Verify both attempts have started before proceeding
- assertEquals("Both attempts should have started", 0,
bothAttemptsStarted.getCount());
-
- // Allow both operations to proceed simultaneously
- eventHandlerCanProceed.countDown();
- rotationTaskCanProceed.countDown();
-
- // Wait for both operations to complete
- rotationThread.join();
-
- // Verify the final state
- LogFileWriter finalWriter = logWriter.getWriter();
- assertNotNull("Final writer should not be null", finalWriter);
- assertTrue("Writer should have been rotated", finalWriter !=
initialWriter);
-
- // Verify only one rotation actually occurred
- assertEquals("Should have only one actual rotation", 1,
rotationCount.get());
-
- // Verify all operations completed successfully
- logWriter.sync();
-
- // Verify the sequence of operations through the latches
- assertTrue("Rotation task should have started",
rotationTaskStarted.getCount() == 0);
- assertTrue("Event handler should have started",
eventHandlerStarted.getCount() == 0);
- }
-
- /**
- * Tests race condition between LogEventHandler retry loop and
LogRotationTask when both try to
- * rotate the writer simultaneously. Verifies that despite concurrent
rotation attempts during a
- * retry scenario, the log is only rotated once. Uses latches to ensure true
concurrency and
- * verify the sequence of operations.
- */
- @Test
- public void testConcurrentRotationDuringRetry() throws Exception {
- final String tableName = "TBLCRR";
- final long commitId = 1L;
- final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
-
- // Get the initial writer
- LogFileWriter initialWriter = logWriter.getWriter();
- assertNotNull("Initial writer should not be null", initialWriter);
-
- // Create latches to control timing and track rotation attempts
- final CountDownLatch retryStarted = new CountDownLatch(1);
- final CountDownLatch retryCanProceed = new CountDownLatch(1);
- final CountDownLatch rotationTaskStarted = new CountDownLatch(1);
- final CountDownLatch rotationTaskCanProceed = new CountDownLatch(1);
- final AtomicInteger rotationCount = new AtomicInteger(0);
- final CountDownLatch bothAttemptsStarted = new CountDownLatch(2);
-
- // Configure the writer to fail on first append, succeed on retry
- doAnswer(invocation -> {
- retryStarted.countDown(); // Signal that retry has started
- bothAttemptsStarted.countDown(); // Signal that this attempt has started
- retryCanProceed.await(); // Wait for permission to proceed
- return invocation.callRealMethod();
- }).when(initialWriter).append(anyString(), anyLong(), any(Mutation.class));
-
- // Configure the rotation task to pause at specific points and track
attempts
- doAnswer(invocation -> {
- rotationTaskStarted.countDown(); // Signal that rotation task has started
- bothAttemptsStarted.countDown(); // Signal that this attempt has started
- rotationTaskCanProceed.await(); // Wait for permission to proceed
- rotationCount.incrementAndGet(); // Track this rotation attempt
- return invocation.callRealMethod();
- }).when(logWriter).rotateLog(ReplicationLog.RotationReason.TIME);
-
- // Start a thread that will trigger rotation via the background task
- Thread rotationThread = new Thread(() -> {
- try {
- // Force rotation by waiting for rotation time
- Thread.sleep((long) (TEST_ROTATION_TIME * 1.25));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- });
- rotationThread.start();
-
- // Start append operation in main thread
- logWriter.append(tableName, commitId, put);
-
- // Wait for both attempts to start - this ensures true concurrency
- assertTrue("Both rotation attempts should start",
- bothAttemptsStarted.await(5, TimeUnit.SECONDS));
-
- // Verify both attempts have started before proceeding
- assertEquals("Both attempts should have started", 0,
bothAttemptsStarted.getCount());
-
- // Allow both operations to proceed simultaneously
- retryCanProceed.countDown();
- rotationTaskCanProceed.countDown();
-
- // Wait for both operations to complete
- rotationThread.join();
-
- // Verify the final state
- LogFileWriter finalWriter = logWriter.getWriter();
- assertNotNull("Final writer should not be null", finalWriter);
- assertTrue("Writer should have been rotated", finalWriter !=
initialWriter);
-
- // Verify only one rotation actually occurred
- assertEquals("Should have only one actual rotation", 1,
rotationCount.get());
-
- // Verify all operations completed successfully
- logWriter.sync();
-
- // Verify the sequence of operations through the latches
- assertTrue("Retry should have started", retryStarted.getCount() == 0);
- assertTrue("Rotation task should have started",
rotationTaskStarted.getCount() == 0);
- }
-
/**
* Tests that multiple sync requests are consolidated into a single sync
operation on the inner
* writer when they occur in quick succession. Verifies that the Disruptor
batching and
@@ -1293,7 +1127,7 @@ public class ReplicationLogTest {
final Mutation put3 = LogFileTestUtil.newPut("row3", 3, 1);
final long commitId3 = 3L;
- LogFileWriter innerWriter = logWriter.getWriter();
+ LogFileWriter innerWriter = logGroup.getActiveWriter().getWriter();
assertNotNull("Inner writer should not be null", innerWriter);
// Configure writer to briefly hold up the LogEventHandler upon first
append.
@@ -1308,12 +1142,12 @@ public class ReplicationLogTest {
// Post appends and three syncs in quick succession. The first append will
be delayed long
// enough for the three syncs to appear in a single Disruptor batch. Then
they should all
// be consolidated into a single sync.
- logWriter.append(tableName, commitId1, put1);
- logWriter.sync();
- logWriter.append(tableName, commitId2, put2);
- logWriter.sync();
- logWriter.append(tableName, commitId3, put3);
- logWriter.sync();
+ logGroup.append(tableName, commitId1, put1);
+ logGroup.sync();
+ logGroup.append(tableName, commitId2, put2);
+ logGroup.sync();
+ logGroup.append(tableName, commitId3, put3);
+ logGroup.sync();
// Verify the sequence of operations on the inner writer: the three
appends, then exactly
// one sync.
@@ -1324,21 +1158,102 @@ public class ReplicationLogTest {
inOrder.verify(innerWriter, times(1)).sync(); // Only one sync should be
called
}
- static class TestableReplicationLog extends ReplicationLog {
+ /**
+ * Tests that ReplicationLogGroup.get() returns the same instance for the
same haGroupId. Verifies
+ * that multiple calls with the same parameters return the cached instance.
+ */
+ @Test
+ public void testReplicationLogGroupCaching() throws Exception {
+ final String haGroupId1 = "testHAGroup1";
+ final String haGroupId2 = "testHAGroup2";
+
+ // Get instances for the first HA group
+ ReplicationLogGroup g1_1 = ReplicationLogGroup.get(conf, serverName,
haGroupId1);
+ ReplicationLogGroup g1_2 = ReplicationLogGroup.get(conf, serverName,
haGroupId1);
+
+ // Verify same instance is returned for same haGroupId
+ assertNotNull("ReplicationLogGroup should not be null", g1_1);
+ assertNotNull("ReplicationLogGroup should not be null", g1_2);
+ assertTrue("Same instance should be returned for same haGroupId", g1_2 ==
g1_1);
+ assertEquals("HA Group name should match", haGroupId1,
g1_1.getHaGroupName());
+
+ // Get instance for a different HA group
+ ReplicationLogGroup g2_1 = ReplicationLogGroup.get(conf, serverName,
haGroupId2);
+ assertNotNull("ReplicationLogGroup should not be null", g2_1);
+ assertTrue("Different instance should be returned for different
haGroupId", g2_1 != g1_1);
+ assertEquals("HA Group name should match", haGroupId2,
g2_1.getHaGroupName());
+
+ // Verify multiple calls still return cached instances
+ ReplicationLogGroup g1_3 = ReplicationLogGroup.get(conf, serverName,
haGroupId1);
+ ReplicationLogGroup g2_2 = ReplicationLogGroup.get(conf, serverName,
haGroupId2);
+ assertTrue("Cached instance should be returned", g1_3 == g1_1);
+ assertTrue("Cached instance should be returned", g2_2 == g2_1);
+
+ // Clean up
+ g1_1.close();
+ g2_1.close();
+ }
+
+ /**
+ * Tests that close() removes the instance from the cache. Verifies that
after closing, a new call
+ * to get() creates a new instance.
+ */
+ @Test
+ public void testReplicationLogGroupCacheRemovalOnClose() throws Exception {
+ final String haGroupId = "testHAGroupCacheRemoval";
+
+ // Get initial instance
+ ReplicationLogGroup g1_1 = ReplicationLogGroup.get(conf, serverName,
haGroupId);
+ assertNotNull("ReplicationLogGroup should not be null", g1_1);
+ assertFalse("Group should not be closed initially", g1_1.isClosed());
+
+ // Verify cached instance is returned
+ ReplicationLogGroup g1_2 = ReplicationLogGroup.get(conf, serverName,
haGroupId);
+ assertTrue("Same instance should be returned before close", g1_2 == g1_1);
+
+ // Close the group
+ g1_1.close();
+ assertTrue("Group should be closed", g1_1.isClosed());
+
+ // Get instance after close - should be a new instance
+ ReplicationLogGroup g1_3 = ReplicationLogGroup.get(conf, serverName,
haGroupId);
+ assertNotNull("ReplicationLogGroup should not be null after close", g1_3);
+ assertFalse("New group should not be closed", g1_3.isClosed());
+ assertTrue("New instance should be created after close", g1_1 != g1_3);
+ assertEquals("HA Group name should match", haGroupId,
g1_3.getHaGroupName());
+
+ // Clean up
+ g1_3.close();
+ }
- protected TestableReplicationLog(Configuration conf, ServerName
serverName) {
- super(conf, serverName);
+ static class TestableLogGroup extends ReplicationLogGroup {
+
+ public TestableLogGroup(Configuration conf, ServerName serverName, String
haGroupName) {
+ super(conf, serverName, haGroupName);
}
@Override
- protected LogFileWriter createNewWriter(FileSystem fs, URI url) throws
IOException {
- return spy(super.createNewWriter(fs, url));
+ protected ReplicationLogGroupWriter createRemoteWriter() throws
IOException {
+ ReplicationLogGroupWriter writer = spy(new
TestableStandbyLogGroupWriter(this));
+ writer.init();
+ return writer;
+ }
+
+ }
+
+ /**
+ * Testable version of StandbyLogGroupWriter that allows spying on writers.
+ */
+ static class TestableStandbyLogGroupWriter extends StandbyLogGroupWriter {
+
+ protected TestableStandbyLogGroupWriter(ReplicationLogGroup logGroup) {
+ super(logGroup);
}
@Override
- protected MetricsReplicationLogSource createMetricsSource() {
- return new MetricsReplicationLogSourceImpl();
+ protected LogFileWriter createNewWriter() throws IOException {
+ LogFileWriter writer = super.createNewWriter();
+ return spy(writer);
}
}
-
}