This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new b653b8e465 PHOENIX-7854 Use Abortable interface to trigger RS aborts
(#2474)
b653b8e465 is described below
commit b653b8e4659ba4832bd9cb099b63d8e2f05bca6c
Author: tkhurana <[email protected]>
AuthorDate: Mon May 18 15:48:18 2026 -0700
PHOENIX-7854 Use Abortable interface to trigger RS aborts (#2474)
---
.../coprocessor/PhoenixRegionServerEndpoint.java | 11 +-
.../phoenix/hbase/index/IndexRegionObserver.java | 13 +-
.../replication/ReplicationLogDiscovery.java | 1 +
.../phoenix/replication/ReplicationLogGroup.java | 245 +++++++++++----------
.../phoenix/replication/ReplicationModeImpl.java | 16 ++
.../replication/StoreAndForwardModeImpl.java | 6 +-
.../replication/SyncAndForwardModeImpl.java | 13 +-
.../apache/phoenix/replication/SyncModeImpl.java | 14 +-
.../replication/reader/ReplicationLogReplay.java | 1 -
.../replication/ReplicationLogGroupTest.java | 118 +++++-----
10 files changed, 229 insertions(+), 209 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index 3f35b40cef..f821e6995a 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -32,8 +32,11 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Bytes;
@@ -62,12 +65,14 @@ import org.slf4j.LoggerFactory;
/**
* This is first implementation of RegionServer coprocessor introduced by
Phoenix.
*/
+@CoreCoprocessor
public class PhoenixRegionServerEndpoint extends
RegionServerEndpointProtos.RegionServerEndpointService implements
RegionServerCoprocessor {
private static final Logger LOGGER =
LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class);
private MetricsMetadataCachingSource metricsSource;
protected Configuration conf;
protected ServerName serverName;
+ private Abortable abortable;
private ExecutorService prewarmExecutor;
// regionserver level thread pool used by Uncovered Indexes to scan data
table rows
@@ -79,6 +84,10 @@ public class PhoenixRegionServerEndpoint extends
if (env instanceof RegionServerCoprocessorEnvironment) {
this.serverName = ((RegionServerCoprocessorEnvironment)
env).getServerName();
}
+ // @CoreCoprocessor guarantees HasRegionServerServices, but guard for
testability
+ if (env instanceof HasRegionServerServices) {
+ this.abortable = ((HasRegionServerServices)
env).getRegionServerServices();
+ }
this.metricsSource =
MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
initUncoveredIndexThreadPool(this.conf);
@@ -308,7 +317,7 @@ public class PhoenixRegionServerEndpoint extends
manager.getClusterRoleRecord(haGroup);
if (shouldInitReplicationLogGroup) {
try {
- ReplicationLogGroup.get(conf, serverName, haGroup);
+ ReplicationLogGroup.get(conf, serverName, haGroup,
abortable);
LOGGER.info("Eagerly initialized ReplicationLogGroup {} on
server {}", haGroup,
serverName);
} catch (Exception e) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 1ad0579a45..b968914731 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellScanner;
@@ -70,6 +71,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -170,6 +173,7 @@ import
org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
* does batch mutations.
* <p>
*/
+@CoreCoprocessor
public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
private static final Logger LOG =
LoggerFactory.getLogger(IndexRegionObserver.class);
@@ -439,6 +443,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS =
100;
private byte[] encodedRegionName;
private boolean shouldReplicate;
+ private Abortable abortable;
// Don't replicate the mutation if this attribute is set
private static final Predicate<Mutation> IGNORE_REPLICATION =
@@ -541,6 +546,10 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
if (this.shouldReplicate) {
this.ignoreReplicationFilter =
getSynchronousReplicationFilter(tableName);
}
+ // @CoreCoprocessor guarantees HasRegionServerServices, but guard for
testability
+ if (e instanceof HasRegionServerServices) {
+ this.abortable = ((HasRegionServerServices)
e).getRegionServerServices();
+ }
} catch (NoSuchMethodError ex) {
disabled = true;
LOG.error("Must be too early a version of HBase. Disabled coprocessor ",
ex);
@@ -689,7 +698,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
byte[] haGroupName =
m.getAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
if (haGroupName != null) {
ReplicationLogGroup logGroup =
ReplicationLogGroup.get(env.getConfiguration(),
- env.getServerName(), Bytes.toString(haGroupName));
+ env.getServerName(), Bytes.toString(haGroupName), abortable);
return Optional.of(logGroup);
}
}
@@ -707,7 +716,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
logKey.getExtendedAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
if (haGroupName != null) {
ReplicationLogGroup logGroup =
ReplicationLogGroup.get(env.getConfiguration(),
- env.getServerName(), Bytes.toString(haGroupName));
+ env.getServerName(), Bytes.toString(haGroupName), abortable);
return Optional.of(logGroup);
}
return Optional.empty();
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
index 3c64e9ec61..ccd0e82caf 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
@@ -125,6 +125,7 @@ public abstract class ReplicationLogDiscovery {
}
public void close() {
+ replicationLogTracker.close();
if (this.metrics != null) {
this.metrics.close();
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index ded1fec7b1..1fe559b449 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.phoenix.jdbc.HAGroupStoreManager;
@@ -67,7 +68,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.base.Throwables;
import
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@@ -171,6 +171,9 @@ public class ReplicationLogGroup {
public static final long DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS =
10_000L;
public static final String WAL_SYNC_TIMEOUT_MS_KEY =
"hbase.regionserver.wal.sync.timeout";
public static final long DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000L;
+ public static final String REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS_KEY =
+ "phoenix.replication.log.group.shutdown.timeout.ms";
+ public static final long DEFAULT_REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS =
30_000L;
public static final String STANDBY_DIR = "in";
public static final String FALLBACK_DIR = "out";
@@ -193,7 +196,9 @@ public class ReplicationLogGroup {
protected ReplicationLogDiscoveryForwarder logForwarder;
protected long syncTimeoutMs;
protected long peerInitTimeoutMs;
- protected volatile boolean closed = false;
+ protected final AtomicBoolean closed = new AtomicBoolean(false);
+ protected final Abortable abortable;
+ protected long shutdownTimeoutMs;
/**
* The replication mode determines how mutations are handled. Mode
transitions occur automatically
@@ -331,8 +336,6 @@ public class ReplicationLogGroup {
protected Disruptor<LogEvent> disruptor;
protected RingBuffer<LogEvent> ringBuffer;
protected LogEventHandler eventHandler;
- // Used to inform the disruptor event thread whether this is a graceful or a
forced shutdown
- private final AtomicBoolean gracefulShutdownEventHandlerFlag = new
AtomicBoolean();
/**
* Get or create a ReplicationLogGroup instance for the given HA Group.
@@ -344,10 +347,25 @@ public class ReplicationLogGroup {
*/
public static ReplicationLogGroup get(Configuration conf, ServerName
serverName,
String haGroupName) throws IOException {
+ return get(conf, serverName, haGroupName, (Abortable) null);
+ }
+
+ /**
+ * Get or create a ReplicationLogGroup instance for the given HA Group.
+ * @param conf Configuration object
+ * @param serverName The server name
+ * @param haGroupName The HA Group name
+ * @param abortable Abortable to invoke on fatal errors (typically
RegionServerServices)
+ * @return ReplicationLogGroup instance
+ * @throws IOException if initialization fails
+ */
+ public static ReplicationLogGroup get(Configuration conf, ServerName
serverName,
+ String haGroupName, Abortable abortable) throws IOException {
try {
return INSTANCES.computeIfAbsent(haGroupName, k -> {
try {
- ReplicationLogGroup group = new ReplicationLogGroup(conf,
serverName, haGroupName);
+ ReplicationLogGroup group = new ReplicationLogGroup(conf,
serverName, haGroupName,
+ HAGroupStoreManager.getInstance(conf), abortable);
group.init();
return group;
} catch (IOException e) {
@@ -390,12 +408,14 @@ public class ReplicationLogGroup {
/**
* Protected constructor for ReplicationLogGroup.
- * @param conf Configuration object
- * @param serverName The server name
- * @param haGroupName The HA Group name
+ * @param conf Configuration object
+ * @param serverName The server name
+ * @param haGroupName The HA Group name
+ * @param haGroupStoreManager HA Group Store Manager instance
*/
- protected ReplicationLogGroup(Configuration conf, ServerName serverName,
String haGroupName) {
- this(conf, serverName, haGroupName, HAGroupStoreManager.getInstance(conf));
+ protected ReplicationLogGroup(Configuration conf, ServerName serverName,
String haGroupName,
+ HAGroupStoreManager haGroupStoreManager) {
+ this(conf, serverName, haGroupName, haGroupStoreManager, null);
}
/**
@@ -404,9 +424,10 @@ public class ReplicationLogGroup {
* @param serverName The server name
* @param haGroupName The HA Group name
* @param haGroupStoreManager HA Group Store Manager instance
+ * @param abortable Abortable to invoke on fatal errors (may be
null)
*/
protected ReplicationLogGroup(Configuration conf, ServerName serverName,
String haGroupName,
- HAGroupStoreManager haGroupStoreManager) {
+ HAGroupStoreManager haGroupStoreManager, Abortable abortable) {
// conf object from coprocessor is instance of
// org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration and we need
to modify it when
// we send rpc to namenode so copying it
@@ -420,6 +441,9 @@ public class ReplicationLogGroup {
this.serverName = serverName;
this.haGroupName = haGroupName;
this.haGroupStoreManager = haGroupStoreManager;
+ this.abortable = abortable;
+ this.shutdownTimeoutMs =
clonedConf.getLong(REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS_KEY,
+ DEFAULT_REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS);
this.metrics = createMetricsSource();
this.mode = new AtomicReference<>(INIT);
}
@@ -517,9 +541,13 @@ public class ReplicationLogGroup {
if (LOG.isTraceEnabled()) {
LOG.trace("Append: table={}, commitId={}, mutation={}", tableName,
commitId, mutation);
}
- if (closed) {
+ if (isClosed()) {
throw new IOException("Closed");
}
+ IOException fatal = eventHandler.getFatalException();
+ if (fatal != null) {
+ throw fatal;
+ }
long startTime = System.nanoTime();
try {
// ringBuffer.next() claims the next sequence number. Because we
initialize the Disruptor
@@ -557,9 +585,13 @@ public class ReplicationLogGroup {
if (LOG.isTraceEnabled()) {
LOG.trace("Sync");
}
- if (closed) {
+ if (isClosed()) {
throw new IOException("Closed");
}
+ IOException fatal = eventHandler.getFatalException();
+ if (fatal != null) {
+ abort("ReplicationLogGroup has a fatal exception", fatal);
+ }
long startTime = System.nanoTime();
try {
syncInternal();
@@ -589,16 +621,15 @@ public class ReplicationLogGroup {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Interrupted while waiting for sync");
} catch (ExecutionException e) {
- // After exhausting all attempts to sync to the standby cluster we
switch mode
- // and then retry again. If that also fails, it is a fatal error
+ Throwable cause = e.getCause();
String message = String.format("HAGroup %s sync operation failed", this);
- LOG.error(message, e);
- abort(message, e);
+ LOG.error(message, cause);
+ abort(message, cause);
} catch (TimeoutException e) {
String message =
String.format("HAGroup %s replication log sync timed out after %d ms",
this, syncTimeoutMs);
LOG.error(message, e);
- PhoenixWALSyncTimeoutException timeoutEx = new
PhoenixWALSyncTimeoutException(message, e);
+ PhoenixWALSyncTimeoutException timeoutEx = new
PhoenixWALSyncTimeoutException(message);
abort(message, timeoutEx);
}
}
@@ -608,67 +639,33 @@ public class ReplicationLogGroup {
* @return true if closed, false otherwise
*/
public boolean isClosed() {
- return closed;
- }
-
- /**
- * Force closes the log group upon an unrecoverable internal error. This is
a fail-stop behavior:
- * once called, the log group is marked as closed, the Disruptor is halted,
and all subsequent
- * append() and sync() calls will throw an IOException("Closed"). This
ensures that no further
- * operations are attempted on a log group that has encountered a critical
error.
- */
- protected void closeOnError() {
- if (closed) {
- return;
- }
- synchronized (this) {
- if (closed) {
- return;
- }
- // setting closed to true prevents future producers to add events to the
ring buffer
- closed = true;
- }
- // Directly halt the disruptor. shutdown() would wait for events to drain.
We are expecting
- // that will not work.
- gracefulShutdownEventHandlerFlag.set(false);
- disruptor.halt();
- metrics.close();
- LOG.info("HAGroup {} closed on error", this);
+ return closed.get();
}
/**
- * Close the ReplicationLogGroup and all associated resources. This method
is thread-safe and can
- * be called multiple times.
+ * Close the ReplicationLogGroup. Drains pending events from the Disruptor
with a bounded timeout
+ * (which triggers onShutdown → modeImpl.onExit → ReplicationLog.close),
then cleans up all
+ * resources. When the event handler has a fatal exception the drain
completes instantly because
+ * each event hits the short-circuit path. The instance is removed from the
INSTANCES cache and
+ * subsequent append()/sync() calls throw IOException.
*/
public void close() {
- if (closed) {
+ if (!closed.compareAndSet(false, true)) {
return;
}
- synchronized (this) {
- if (closed) {
- return;
- }
- LOG.info("Closing HAGroup {}", this);
- // setting closed to true prevents future producers to add events to the
ring buffer
- closed = true;
- // Remove from instances cache
- INSTANCES.remove(haGroupName);
- // Sync before shutting down to flush all pending appends.
- try {
- syncInternal();
- gracefulShutdownEventHandlerFlag.set(true);
- // waits until all the events in the disruptor have been processed
- disruptor.shutdown();
- } catch (IOException e) {
- LOG.warn("Error during final sync on close", e);
- gracefulShutdownEventHandlerFlag.set(false);
- disruptor.halt(); // Go directly to halt.
- }
- // wait for the disruptor threads to finish
- shutdownDisruptorExecutor();
- metrics.close();
- LOG.info("HAGroup {} closed", this);
+ LOG.info("Closing HAGroup {}", this);
+ INSTANCES.remove(haGroupName);
+ try {
+ disruptor.shutdown(shutdownTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (com.lmax.disruptor.TimeoutException e) {
+ LOG.warn("HAGroup {} shutdown timed out after {}ms, halting", this,
shutdownTimeoutMs);
+ disruptor.halt();
}
+ shutdownDisruptorExecutor();
+ logForwarder.stop();
+ logForwarder.close();
+ metrics.close();
+ LOG.info("HAGroup {} closed", this);
}
/**
@@ -902,20 +899,33 @@ public class ReplicationLogGroup {
}
/**
- * Abort when we hit a fatal error
+ * Throws the given cause as an IOException. If it already is one, rethrows
directly; otherwise
+ * wraps it.
*/
- protected void abort(String reason, Throwable cause) {
- // TODO better to use abort using RegionServerServices
- String msg = "***** ABORTING region server: " + reason + " *****";
- if (cause != null) {
- msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
+ static IOException asIOException(String msg, Throwable cause) {
+ if (cause instanceof IOException) {
+ return (IOException) cause;
}
- LOG.error(msg);
- if (cause != null) {
- throw new RuntimeException(msg, cause);
- } else {
- throw new RuntimeException(msg);
+ return cause != null ? new IOException(msg, cause) : new IOException(msg);
+ }
+
+ /**
+ * Abort the region server due to an unrecoverable replication failure. Must
be called from a
+ * producer thread (never the event handler thread). Sets the fatal
exception so remaining events
+ * drain instantly, then closes all resources and invokes the Abortable to
trigger RS shutdown.
+ * Always throws so the caller unwinds.
+ */
+ protected void abort(String reason, Throwable cause) throws IOException {
+ String msg = "Aborting region server due to replication failure: " +
reason;
+ LOG.error(msg, cause);
+ // Idempotent; may already be set by the event handler (ExecutionException
path)
+ // but is needed here for the TimeoutException path where the event
handler is still alive.
+ eventHandler.setFatalException(asIOException(msg, cause));
+ close();
+ if (abortable != null) {
+ abortable.abort(msg, cause);
}
+ throw asIOException(msg, cause);
}
/**
@@ -923,12 +933,24 @@ public class ReplicationLogGroup {
*/
protected class LogEventHandler implements EventHandler<LogEvent>,
LifecycleAware {
private final List<CompletableFuture<Void>> pendingSyncFutures = new
ArrayList<>();
- // Current replication mode implementation which will handle the events
private ReplicationModeImpl currentModeImpl;
+ private volatile IOException fatalException;
public LogEventHandler() {
}
+ void setFatalException(IOException cause) {
+ if (fatalException != null) {
+ return;
+ }
+ LOG.error("HAGroup {} event handler hit fatal exception",
ReplicationLogGroup.this, cause);
+ this.fatalException = cause;
+ }
+
+ IOException getFatalException() {
+ return fatalException;
+ }
+
public void init() throws IOException {
initializeMode(getMode());
}
@@ -996,7 +1018,8 @@ public class ReplicationLogGroup {
// is not processing any event like append/sync because this is the
only thread
// that is consuming the events from the ring buffer and handing them
off to the
// mode
- currentModeImpl.onExit(true);
+ ReplicationModeImpl oldModeImpl = currentModeImpl;
+ disruptorExecutor.execute(() -> oldModeImpl.onExit(true));
initializeMode(newMode);
}
}
@@ -1086,25 +1109,27 @@ public class ReplicationLogGroup {
*/
@Override
public void onEvent(LogEvent event, long sequence, boolean endOfBatch)
throws Exception {
- // Calculate time spent in ring buffer
long currentTimeNs = System.nanoTime();
long ringBufferTimeNs = currentTimeNs - event.timestampNs;
metrics.updateRingBufferTime(ringBufferTimeNs);
+ if (fatalException != null) {
+ // Append events are ignored; sync futures are failed immediately
+ // so producer threads unblock without waiting for the sync timeout.
+ if (event.type == EVENT_TYPE_SYNC) {
+ event.syncFuture.completeExceptionally(fatalException);
+ }
+ return;
+ }
try {
switch (event.type) {
case EVENT_TYPE_DATA:
currentModeImpl.append(event.record);
- // Process any pending syncs at the end of batch.
if (endOfBatch) {
processPendingSyncs(sequence);
}
return;
case EVENT_TYPE_SYNC:
- // Add this sync future to the pending list
- // OK, to add the same future multiple times when we rewind the
batch
- // as completing an already completed future is a no-op
pendingSyncFutures.add(event.syncFuture);
- // Process any pending syncs at the end of batch.
if (endOfBatch) {
processPendingSyncs(sequence);
}
@@ -1118,17 +1143,16 @@ public class ReplicationLogGroup {
e);
onFailure(event, sequence, e);
} catch (Exception fatalEx) {
- // Either we failed to switch the mode or we are in
STORE_AND_FORWARD mode
- // and got an exception. This is a fatal exception so halt the
disruptor
- // fail the pending sync events with the original exception
- failPendingSyncs(sequence, e);
- // Both SYNC and SAF writes failed. The local HBase WAL has the
mutation but neither
- // replication pipeline does, so the SAF forwarder cannot reconcile
it. Abort the RS
- // so region reassignment + preWALRestore re-ships the orphaned
edits.
- abort("Both SYNC and SAF replication writes failed", fatalEx);
- // halt the disruptor with the fatal exception
- throw fatalEx;
+ IOException fatalIOE =
+ asIOException("Both SYNC and SAF replication writes failed",
fatalEx);
+ setFatalException(fatalIOE);
+ failPendingSyncs(sequence, fatalIOE);
}
+ } catch (Throwable t) {
+ IOException wrapped =
+ new IOException("Unexpected error in event handler at sequence " +
sequence, t);
+ setFatalException(wrapped);
+ failPendingSyncs(sequence, wrapped);
}
}
@@ -1139,36 +1163,35 @@ public class ReplicationLogGroup {
@Override
public void onShutdown() {
- boolean isGracefulShutdown = gracefulShutdownEventHandlerFlag.get();
+ boolean graceful = (fatalException == null);
LOG.info("HAGroup {} shutting down event handler graceful={}",
ReplicationLogGroup.this,
- isGracefulShutdown);
- currentModeImpl.onExit(isGracefulShutdown);
+ graceful);
+ currentModeImpl.onExit(graceful);
}
}
/**
- * Handler for critical errors during the Disruptor lifecycle that closes
the writer to prevent
- * data loss.
+ * Safety-net handler for exceptions that escape the event handler. Should
never fire because
+ * onEvent catches all Throwables. If it does fire, sets a fatal exception
so pending futures
+ * fail.
*/
protected class LogExceptionHandler implements ExceptionHandler<LogEvent> {
@Override
public void handleEventException(Throwable e, long sequence, LogEvent
event) {
- String message = "Exception processing sequence " + sequence + " for
event " + event;
- LOG.error(message, e);
- closeOnError();
+ LOG.error("UNEXPECTED: Exception escaped onEvent at sequence {}",
sequence, e);
+ eventHandler
+ .setFatalException(new IOException("Exception escaped to
LogExceptionHandler", e));
}
@Override
public void handleOnStartException(Throwable e) {
LOG.error("Exception during Disruptor startup", e);
- closeOnError();
+ eventHandler.setFatalException(new IOException("Disruptor startup
failed", e));
}
@Override
public void handleOnShutdownException(Throwable e) {
- // Should not happen, but if it does, the regionserver is aborting or
shutting down.
LOG.error("Exception during Disruptor shutdown", e);
- closeOnError();
}
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
index 1238513135..114e519b42 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.replication;
import java.io.IOException;
import org.apache.phoenix.replication.ReplicationLogGroup.Record;
import org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -31,6 +33,7 @@ import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTes
* </p>
*/
public abstract class ReplicationModeImpl {
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicationModeImpl.class);
protected final ReplicationLogGroup logGroup;
// The mode manages the underlying log to which the append and sync events
will be sent
@@ -91,4 +94,17 @@ public abstract class ReplicationModeImpl {
log.close(graceful);
}
}
+
+ protected ReplicationLogGroup.ReplicationMode transitionToStoreAndForward()
throws IOException {
+ logGroup.getMetrics().incrementSyncToSafTransitions();
+ try {
+ logGroup.setHAGroupStatusToStoreAndForward();
+ } catch (Exception ex) {
+ String message =
+ String.format("HAGroup %s could not update status to
STORE_AND_FORWARD", logGroup);
+ LOG.error(message, ex);
+ throw ReplicationLogGroup.asIOException(message, ex);
+ }
+ return ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
index 7bcb4c89e0..8f205ea767 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
@@ -110,12 +110,10 @@ public class StoreAndForwardModeImpl extends
ReplicationModeImpl {
@Override
ReplicationMode onFailure(Throwable e) throws IOException {
- // Treating failures in STORE_AND_FORWARD mode as fatal errors
+ // Failures in STORE_AND_FORWARD mode are fatal — throw to let the event
handler abort
String message = String.format("HAGroup %s mode=%s got error", logGroup,
this);
LOG.error(message, e);
- logGroup.abort(message, e);
- // unreachable, we remain in the same mode
- return STORE_AND_FORWARD;
+ throw ReplicationLogGroup.asIOException(message, e);
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
index 02c57f7f3f..6c85ee27d4 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.replication;
-import static
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
import static
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC_AND_FORWARD;
import java.io.IOException;
@@ -62,17 +61,7 @@ public class SyncAndForwardModeImpl extends
ReplicationModeImpl {
@Override
ReplicationMode onFailure(Throwable e) throws IOException {
LOG.info("HAGroup {} mode={} got error", logGroup, this, e);
- logGroup.getMetrics().incrementSyncToSafTransitions();
- try {
- logGroup.setHAGroupStatusToStoreAndForward();
- } catch (Exception ex) {
- // Fatal error when we can't update the HAGroup status
- String message =
- String.format("HAGroup %s could not update status to
STORE_AND_FORWARD", logGroup);
- LOG.error(message, ex);
- logGroup.abort(message, ex);
- }
- return STORE_AND_FORWARD;
+ return transitionToStoreAndForward();
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
index 0a5b5a48c6..ca258a5b76 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
@@ -17,7 +17,6 @@
*/
package org.apache.phoenix.replication;
-import static
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
import static
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC;
import java.io.IOException;
@@ -57,18 +56,7 @@ public class SyncModeImpl extends ReplicationModeImpl {
@Override
ReplicationMode onFailure(Throwable e) throws IOException {
LOG.info("HAGroup {} mode={} got error", logGroup, this, e);
- logGroup.getMetrics().incrementSyncToSafTransitions();
- try {
- // first update the HAGroupStore state
- logGroup.setHAGroupStatusToStoreAndForward();
- } catch (Exception ex) {
- // Fatal error when we can't update the HAGroup status
- String message =
- String.format("HAGroup %s could not update status to
STORE_AND_FORWARD", logGroup);
- LOG.error(message, ex);
- logGroup.abort(message, ex);
- }
- return STORE_AND_FORWARD;
+ return transitionToStoreAndForward();
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
index 88eeb791e3..d062aa8c53 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
@@ -128,7 +128,6 @@ public class ReplicationLogReplay {
public void close() {
LOG.info("Closing ReplicationLogReplay for haGroup: {}", haGroupName);
- replicationLogDiscoveryReplay.getReplicationLogFileTracker().close();
replicationLogDiscoveryReplay.close();
// Remove the instance from cache
INSTANCES.remove(haGroupName);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index ddd0cea0ec..3fd7702e6a 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -264,13 +264,12 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
logGroup.append(tableName, commitId, put);
// sync on the writer will timeout — syncInternal wraps in
PhoenixWALSyncTimeoutException
- // and calls abort() which throws RuntimeException
+ // and calls abort() which rethrows the IOException cause directly
try {
logGroup.sync();
- fail("Should have thrown RuntimeException because sync timed out");
- } catch (RuntimeException e) {
- assertTrue("Expected PhoenixWALSyncTimeoutException cause",
- e.getCause() instanceof PhoenixWALSyncTimeoutException);
+ fail("Should have thrown PhoenixWALSyncTimeoutException because sync
timed out");
+ } catch (PhoenixWALSyncTimeoutException e) {
+ // expected
}
// reset
doNothing().when(innerWriter).sync();
@@ -624,27 +623,25 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
doThrow(new RuntimeException("Simulated critical
error")).when(innerWriter).append(anyString(),
anyLong(), any(Mutation.class));
- // Append data. This should trigger the LogExceptionHandler, which will
close logWriter.
- // The sync future times out, syncInternal wraps in
PhoenixWALSyncTimeoutException and aborts.
+ // Append publishes to the ring buffer. The event handler catches the
RuntimeException via
+ // catch(Throwable), poisons itself, and fails the sync future. The
producer receives
+ // ExecutionException and calls abort().
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
- fail("Should have thrown RuntimeException because sync timed out");
- } catch (RuntimeException e) {
- assertTrue("Expected PhoenixWALSyncTimeoutException cause",
- e.getCause() instanceof PhoenixWALSyncTimeoutException);
+ fail("Should have thrown IOException from abort");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Unexpected error in event handler"));
}
- // Verify that subsequent operations fail because the log is closed
+ // Verify that subsequent operations fail
try {
logGroup.append(tableName, commitId + 1, put);
- fail("Should have thrown IOException because log is closed");
+ fail("Should have thrown IOException because log is poisoned/closed");
} catch (IOException e) {
- assertTrue("Expected an IOException because log is closed",
- e.getMessage().contains("Closed"));
+ // expected
}
- // Verify that the inner writer was closed by the LogExceptionHandler
verify(innerWriter, times(1)).close();
}
@@ -717,15 +714,14 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Append data
logGroup.append(tableName, commitId, put);
- // Try to sync. Should fail after exhausting retries and then switch to
STORE_AND_FORWARD
+ // Sync fails, mode transition to SAF also fails because HAGroupStore
update throws.
+ // The event handler poisons itself, and the producer calls abort().
try {
logGroup.sync();
- fail("Should have thrown exception because of failure to update mode");
- } catch (RuntimeException ex) {
- assertTrue(ex.getMessage().contains("Simulated sync failure"));
+ fail("Should have thrown IOException because of failure to update mode");
+ } catch (IOException ex) {
+ assertTrue(ex.getMessage().contains("Simulated failure to update
HAGroupStore state"));
}
- // wait for the event processor thread to clean up
- Thread.sleep(3);
}
/**
@@ -971,33 +967,30 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
// Configure writer to throw RuntimeException on getLength()
doThrow(new RuntimeException("Simulated critical
error")).when(innerWriter).getLength();
- // Append data. This should trigger the LogExceptionHandler, which will
close logWriter.
- // The sync future times out, syncInternal wraps in
PhoenixWALSyncTimeoutException and aborts.
+ // Append publishes to the ring buffer. The event handler catches the
RuntimeException,
+ // poisons itself, and fails the sync future. The producer calls abort().
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
- fail("Should have thrown RuntimeException because sync timed out");
- } catch (RuntimeException e) {
- assertTrue("Expected PhoenixWALSyncTimeoutException cause",
- e.getCause() instanceof PhoenixWALSyncTimeoutException);
+ fail("Should have thrown IOException from abort");
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("Unexpected error in event handler"));
}
- // Verify that subsequent operations fail because the log is closed
+ // Verify that subsequent operations fail
try {
logGroup.append(tableName, commitId + 1, put);
- fail("Should have thrown IOException because log is closed");
+ fail("Should have thrown IOException because log is poisoned/closed");
} catch (IOException e) {
- assertTrue("Expected an IOException because log is closed",
- e.getMessage().contains("Closed"));
+ // expected
}
- // Verify that the inner writer was closed by the LogExceptionHandler
verify(innerWriter, times(1)).close();
}
/**
- * Tests behavior when a RuntimeException occurs during append() after
closeOnError() has been
- * called. Verifies that the system properly rejects sync operations after
being closed.
+ * Tests behavior when a RuntimeException occurs during append and
subsequent appends are
+ * rejected. Verifies that the system properly rejects operations after
being poisoned/closed.
*/
@Test
public void testAppendAfterCloseOnError() throws Exception {
@@ -1013,32 +1006,30 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
doThrow(new RuntimeException("Simulated critical
error")).when(innerWriter).append(anyString(),
anyLong(), any(Mutation.class));
- // Append data to trigger closeOnError()
+ // Append publishes to the ring buffer. The event handler catches the
RuntimeException,
+ // poisons itself, and fails the sync future. The producer calls abort().
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
- fail("Should have thrown RuntimeException because sync timed out");
- } catch (RuntimeException e) {
- assertTrue("Expected PhoenixWALSyncTimeoutException cause",
- e.getCause() instanceof PhoenixWALSyncTimeoutException);
+ fail("Should have thrown IOException from abort");
+ } catch (IOException e) {
+ // expected
}
- // Verify that subsequent append operations fail because the log is closed
+ // Verify that subsequent append operations fail because the log is
poisoned/closed
try {
logGroup.append(tableName, commitId, put);
- fail("Should have thrown IOException because log is closed");
+ fail("Should have thrown IOException because log is poisoned/closed");
} catch (IOException e) {
- assertTrue("Expected an IOException because log is closed",
- e.getMessage().contains("Closed"));
+ // expected
}
- // Verify that the inner writer was closed by the LogExceptionHandler
verify(innerWriter, times(1)).close();
}
/**
- * Tests behavior when a RuntimeException occurs during sync() after
closeOnError() has been
- * called. Verifies that the system properly rejects sync operations after
being closed.
+ * Tests behavior when a RuntimeException occurs during append and
subsequent syncs are rejected.
+ * Verifies that the system properly rejects operations after being
poisoned/closed.
*/
@Test
public void testSyncAfterCloseOnError() throws Exception {
@@ -1054,26 +1045,24 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
doThrow(new RuntimeException("Simulated critical
error")).when(innerWriter).append(anyString(),
anyLong(), any(Mutation.class));
- // Append data to trigger closeOnError()
+ // Append publishes to the ring buffer. The event handler catches the
RuntimeException,
+ // poisons itself, and fails the sync future. The producer calls abort().
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
- fail("Should have thrown RuntimeException because sync timed out");
- } catch (RuntimeException e) {
- assertTrue("Expected PhoenixWALSyncTimeoutException cause",
- e.getCause() instanceof PhoenixWALSyncTimeoutException);
+ fail("Should have thrown IOException from abort");
+ } catch (IOException e) {
+ // expected
}
- // Verify that subsequent sync operations fail because the log is closed
+ // Verify that subsequent sync operations fail because the log is
poisoned/closed
try {
logGroup.sync();
- fail("Should have thrown IOException because log is closed");
+ fail("Should have thrown IOException because log is poisoned/closed");
} catch (IOException e) {
- assertTrue("Expected an IOException because log is closed",
- e.getMessage().contains("Closed"));
+ // expected
}
- // Verify that the inner writer was closed by the LogExceptionHandler
verify(innerWriter, times(1)).close();
}
@@ -1722,11 +1711,10 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
- fail("Should have thrown RuntimeException from abort");
- } catch (RuntimeException e) {
- assertTrue("Abort message should mention both SYNC and SAF",
- e.getMessage().contains("Both SYNC and SAF replication writes failed")
- || e.getMessage().contains("ABORTING"));
+ fail("Should have thrown IOException from abort");
+ } catch (IOException e) {
+ assertTrue("Abort message should contain sync failure",
+ e.getMessage().contains("Simulated sync failure"));
}
}
@@ -1764,10 +1752,10 @@ public class ReplicationLogGroupTest extends
ReplicationLogBaseTest {
logGroup.append(tableName, commitId, put);
try {
logGroup.sync();
- fail("Should have thrown RuntimeException from abort");
- } catch (RuntimeException e) {
+ fail("Should have thrown IOException from abort");
+ } catch (IOException e) {
assertTrue("Abort should fire from SAF failure path",
- e.getMessage().contains("ABORTING") || e.getMessage().contains("got
error"));
+ e.getMessage().contains("Simulated SAF sync failure"));
}
}