This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new b653b8e465 PHOENIX-7854 Use Abortable interface to trigger RS aborts 
(#2474)
b653b8e465 is described below

commit b653b8e4659ba4832bd9cb099b63d8e2f05bca6c
Author: tkhurana <[email protected]>
AuthorDate: Mon May 18 15:48:18 2026 -0700

    PHOENIX-7854 Use Abortable interface to trigger RS aborts (#2474)
---
 .../coprocessor/PhoenixRegionServerEndpoint.java   |  11 +-
 .../phoenix/hbase/index/IndexRegionObserver.java   |  13 +-
 .../replication/ReplicationLogDiscovery.java       |   1 +
 .../phoenix/replication/ReplicationLogGroup.java   | 245 +++++++++++----------
 .../phoenix/replication/ReplicationModeImpl.java   |  16 ++
 .../replication/StoreAndForwardModeImpl.java       |   6 +-
 .../replication/SyncAndForwardModeImpl.java        |  13 +-
 .../apache/phoenix/replication/SyncModeImpl.java   |  14 +-
 .../replication/reader/ReplicationLogReplay.java   |   1 -
 .../replication/ReplicationLogGroupTest.java       | 118 +++++-----
 10 files changed, 229 insertions(+), 209 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index 3f35b40cef..f821e6995a 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -32,8 +32,11 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -62,12 +65,14 @@ import org.slf4j.LoggerFactory;
 /**
  * This is first implementation of RegionServer coprocessor introduced by 
Phoenix.
  */
+@CoreCoprocessor
 public class PhoenixRegionServerEndpoint extends
   RegionServerEndpointProtos.RegionServerEndpointService implements 
RegionServerCoprocessor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixRegionServerEndpoint.class);
   private MetricsMetadataCachingSource metricsSource;
   protected Configuration conf;
   protected ServerName serverName;
+  private Abortable abortable;
   private ExecutorService prewarmExecutor;
 
   // regionserver level thread pool used by Uncovered Indexes to scan data 
table rows
@@ -79,6 +84,10 @@ public class PhoenixRegionServerEndpoint extends
     if (env instanceof RegionServerCoprocessorEnvironment) {
       this.serverName = ((RegionServerCoprocessorEnvironment) 
env).getServerName();
     }
+    // @CoreCoprocessor guarantees HasRegionServerServices, but guard for 
testability
+    if (env instanceof HasRegionServerServices) {
+      this.abortable = ((HasRegionServerServices) 
env).getRegionServerServices();
+    }
     this.metricsSource =
       
MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource();
     initUncoveredIndexThreadPool(this.conf);
@@ -308,7 +317,7 @@ public class PhoenixRegionServerEndpoint extends
               manager.getClusterRoleRecord(haGroup);
               if (shouldInitReplicationLogGroup) {
                 try {
-                  ReplicationLogGroup.get(conf, serverName, haGroup);
+                  ReplicationLogGroup.get(conf, serverName, haGroup, 
abortable);
                   LOGGER.info("Eagerly initialized ReplicationLogGroup {} on 
server {}", haGroup,
                     serverName);
                 } catch (Exception e) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 1ad0579a45..b968914731 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellScanner;
@@ -70,6 +71,8 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.HasRegionServerServices;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -170,6 +173,7 @@ import 
org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
  * does batch mutations.
  * <p>
  */
+@CoreCoprocessor
 public class IndexRegionObserver implements RegionCoprocessor, RegionObserver {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(IndexRegionObserver.class);
@@ -439,6 +443,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 
100;
   private byte[] encodedRegionName;
   private boolean shouldReplicate;
+  private Abortable abortable;
 
   // Don't replicate the mutation if this attribute is set
   private static final Predicate<Mutation> IGNORE_REPLICATION =
@@ -541,6 +546,10 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       if (this.shouldReplicate) {
         this.ignoreReplicationFilter = 
getSynchronousReplicationFilter(tableName);
       }
+      // @CoreCoprocessor guarantees HasRegionServerServices, but guard for 
testability
+      if (e instanceof HasRegionServerServices) {
+        this.abortable = ((HasRegionServerServices) 
e).getRegionServerServices();
+      }
     } catch (NoSuchMethodError ex) {
       disabled = true;
       LOG.error("Must be too early a version of HBase. Disabled coprocessor ", 
ex);
@@ -689,7 +698,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       byte[] haGroupName = 
m.getAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
       if (haGroupName != null) {
         ReplicationLogGroup logGroup = 
ReplicationLogGroup.get(env.getConfiguration(),
-          env.getServerName(), Bytes.toString(haGroupName));
+          env.getServerName(), Bytes.toString(haGroupName), abortable);
         return Optional.of(logGroup);
       }
     }
@@ -707,7 +716,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       
logKey.getExtendedAttribute(BaseScannerRegionObserverConstants.HA_GROUP_NAME_ATTRIB);
     if (haGroupName != null) {
       ReplicationLogGroup logGroup = 
ReplicationLogGroup.get(env.getConfiguration(),
-        env.getServerName(), Bytes.toString(haGroupName));
+        env.getServerName(), Bytes.toString(haGroupName), abortable);
       return Optional.of(logGroup);
     }
     return Optional.empty();
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
index 3c64e9ec61..ccd0e82caf 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogDiscovery.java
@@ -125,6 +125,7 @@ public abstract class ReplicationLogDiscovery {
   }
 
   public void close() {
+    replicationLogTracker.close();
     if (this.metrics != null) {
       this.metrics.close();
     }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index ded1fec7b1..1fe559b449 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.phoenix.jdbc.HAGroupStoreManager;
@@ -67,7 +68,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.phoenix.thirdparty.com.google.common.base.Throwables;
 import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@@ -171,6 +171,9 @@ public class ReplicationLogGroup {
   public static final long DEFAULT_REPLICATION_LOG_PEER_INIT_TIMEOUT_MS = 
10_000L;
   public static final String WAL_SYNC_TIMEOUT_MS_KEY = 
"hbase.regionserver.wal.sync.timeout";
   public static final long DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000L;
+  public static final String REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS_KEY =
+    "phoenix.replication.log.group.shutdown.timeout.ms";
+  public static final long DEFAULT_REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS = 
30_000L;
 
   public static final String STANDBY_DIR = "in";
   public static final String FALLBACK_DIR = "out";
@@ -193,7 +196,9 @@ public class ReplicationLogGroup {
   protected ReplicationLogDiscoveryForwarder logForwarder;
   protected long syncTimeoutMs;
   protected long peerInitTimeoutMs;
-  protected volatile boolean closed = false;
+  protected final AtomicBoolean closed = new AtomicBoolean(false);
+  protected final Abortable abortable;
+  protected long shutdownTimeoutMs;
 
   /**
    * The replication mode determines how mutations are handled. Mode 
transitions occur automatically
@@ -331,8 +336,6 @@ public class ReplicationLogGroup {
   protected Disruptor<LogEvent> disruptor;
   protected RingBuffer<LogEvent> ringBuffer;
   protected LogEventHandler eventHandler;
-  // Used to inform the disruptor event thread whether this is a graceful or a 
forced shutdown
-  private final AtomicBoolean gracefulShutdownEventHandlerFlag = new 
AtomicBoolean();
 
   /**
    * Get or create a ReplicationLogGroup instance for the given HA Group.
@@ -344,10 +347,25 @@ public class ReplicationLogGroup {
    */
   public static ReplicationLogGroup get(Configuration conf, ServerName 
serverName,
     String haGroupName) throws IOException {
+    return get(conf, serverName, haGroupName, (Abortable) null);
+  }
+
+  /**
+   * Get or create a ReplicationLogGroup instance for the given HA Group.
+   * @param conf        Configuration object
+   * @param serverName  The server name
+   * @param haGroupName The HA Group name
+   * @param abortable   Abortable to invoke on fatal errors (typically 
RegionServerServices)
+   * @return ReplicationLogGroup instance
+   * @throws IOException if initialization fails
+   */
+  public static ReplicationLogGroup get(Configuration conf, ServerName 
serverName,
+    String haGroupName, Abortable abortable) throws IOException {
     try {
       return INSTANCES.computeIfAbsent(haGroupName, k -> {
         try {
-          ReplicationLogGroup group = new ReplicationLogGroup(conf, 
serverName, haGroupName);
+          ReplicationLogGroup group = new ReplicationLogGroup(conf, 
serverName, haGroupName,
+            HAGroupStoreManager.getInstance(conf), abortable);
           group.init();
           return group;
         } catch (IOException e) {
@@ -390,12 +408,14 @@ public class ReplicationLogGroup {
 
   /**
    * Protected constructor for ReplicationLogGroup.
-   * @param conf        Configuration object
-   * @param serverName  The server name
-   * @param haGroupName The HA Group name
+   * @param conf                Configuration object
+   * @param serverName          The server name
+   * @param haGroupName         The HA Group name
+   * @param haGroupStoreManager HA Group Store Manager instance
    */
-  protected ReplicationLogGroup(Configuration conf, ServerName serverName, 
String haGroupName) {
-    this(conf, serverName, haGroupName, HAGroupStoreManager.getInstance(conf));
+  protected ReplicationLogGroup(Configuration conf, ServerName serverName, 
String haGroupName,
+    HAGroupStoreManager haGroupStoreManager) {
+    this(conf, serverName, haGroupName, haGroupStoreManager, null);
   }
 
   /**
@@ -404,9 +424,10 @@ public class ReplicationLogGroup {
    * @param serverName          The server name
    * @param haGroupName         The HA Group name
    * @param haGroupStoreManager HA Group Store Manager instance
+   * @param abortable           Abortable to invoke on fatal errors (may be 
null)
    */
   protected ReplicationLogGroup(Configuration conf, ServerName serverName, 
String haGroupName,
-    HAGroupStoreManager haGroupStoreManager) {
+    HAGroupStoreManager haGroupStoreManager, Abortable abortable) {
     // conf object from coprocessor is instance of
     // org.apache.hadoop.hbase.coprocessor.ReadOnlyConfiguration and we need 
to modify it when
     // we send rpc to namenode so copying it
@@ -420,6 +441,9 @@ public class ReplicationLogGroup {
     this.serverName = serverName;
     this.haGroupName = haGroupName;
     this.haGroupStoreManager = haGroupStoreManager;
+    this.abortable = abortable;
+    this.shutdownTimeoutMs = 
clonedConf.getLong(REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS_KEY,
+      DEFAULT_REPLICATION_LOG_GROUP_SHUTDOWN_TIMEOUT_MS);
     this.metrics = createMetricsSource();
     this.mode = new AtomicReference<>(INIT);
   }
@@ -517,9 +541,13 @@ public class ReplicationLogGroup {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Append: table={}, commitId={}, mutation={}", tableName, 
commitId, mutation);
     }
-    if (closed) {
+    if (isClosed()) {
       throw new IOException("Closed");
     }
+    IOException fatal = eventHandler.getFatalException();
+    if (fatal != null) {
+      throw fatal;
+    }
     long startTime = System.nanoTime();
     try {
       // ringBuffer.next() claims the next sequence number. Because we 
initialize the Disruptor
@@ -557,9 +585,13 @@ public class ReplicationLogGroup {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Sync");
     }
-    if (closed) {
+    if (isClosed()) {
       throw new IOException("Closed");
     }
+    IOException fatal = eventHandler.getFatalException();
+    if (fatal != null) {
+      abort("ReplicationLogGroup has a fatal exception", fatal);
+    }
     long startTime = System.nanoTime();
     try {
       syncInternal();
@@ -589,16 +621,15 @@ public class ReplicationLogGroup {
       Thread.currentThread().interrupt();
       throw new InterruptedIOException("Interrupted while waiting for sync");
     } catch (ExecutionException e) {
-      // After exhausting all attempts to sync to the standby cluster we 
switch mode
-      // and then retry again. If that also fails, it is a fatal error
+      Throwable cause = e.getCause();
       String message = String.format("HAGroup %s sync operation failed", this);
-      LOG.error(message, e);
-      abort(message, e);
+      LOG.error(message, cause);
+      abort(message, cause);
     } catch (TimeoutException e) {
       String message =
         String.format("HAGroup %s replication log sync timed out after %d ms", 
this, syncTimeoutMs);
       LOG.error(message, e);
-      PhoenixWALSyncTimeoutException timeoutEx = new 
PhoenixWALSyncTimeoutException(message, e);
+      PhoenixWALSyncTimeoutException timeoutEx = new 
PhoenixWALSyncTimeoutException(message);
       abort(message, timeoutEx);
     }
   }
@@ -608,67 +639,33 @@ public class ReplicationLogGroup {
    * @return true if closed, false otherwise
    */
   public boolean isClosed() {
-    return closed;
-  }
-
-  /**
-   * Force closes the log group upon an unrecoverable internal error. This is 
a fail-stop behavior:
-   * once called, the log group is marked as closed, the Disruptor is halted, 
and all subsequent
-   * append() and sync() calls will throw an IOException("Closed"). This 
ensures that no further
-   * operations are attempted on a log group that has encountered a critical 
error.
-   */
-  protected void closeOnError() {
-    if (closed) {
-      return;
-    }
-    synchronized (this) {
-      if (closed) {
-        return;
-      }
-      // setting closed to true prevents future producers to add events to the 
ring buffer
-      closed = true;
-    }
-    // Directly halt the disruptor. shutdown() would wait for events to drain. 
We are expecting
-    // that will not work.
-    gracefulShutdownEventHandlerFlag.set(false);
-    disruptor.halt();
-    metrics.close();
-    LOG.info("HAGroup {} closed on error", this);
+    return closed.get();
   }
 
   /**
-   * Close the ReplicationLogGroup and all associated resources. This method 
is thread-safe and can
-   * be called multiple times.
+   * Close the ReplicationLogGroup. Drains pending events from the Disruptor 
with a bounded timeout
+   * (which triggers onShutdown → modeImpl.onExit → ReplicationLog.close), 
then cleans up all
+   * resources. When the event handler has a fatal exception the drain 
completes instantly because
+   * each event hits the short-circuit path. The instance is removed from the 
INSTANCES cache and
+   * subsequent append()/sync() calls throw IOException.
    */
   public void close() {
-    if (closed) {
+    if (!closed.compareAndSet(false, true)) {
       return;
     }
-    synchronized (this) {
-      if (closed) {
-        return;
-      }
-      LOG.info("Closing HAGroup {}", this);
-      // setting closed to true prevents future producers to add events to the 
ring buffer
-      closed = true;
-      // Remove from instances cache
-      INSTANCES.remove(haGroupName);
-      // Sync before shutting down to flush all pending appends.
-      try {
-        syncInternal();
-        gracefulShutdownEventHandlerFlag.set(true);
-        // waits until all the events in the disruptor have been processed
-        disruptor.shutdown();
-      } catch (IOException e) {
-        LOG.warn("Error during final sync on close", e);
-        gracefulShutdownEventHandlerFlag.set(false);
-        disruptor.halt(); // Go directly to halt.
-      }
-      // wait for the disruptor threads to finish
-      shutdownDisruptorExecutor();
-      metrics.close();
-      LOG.info("HAGroup {} closed", this);
+    LOG.info("Closing HAGroup {}", this);
+    INSTANCES.remove(haGroupName);
+    try {
+      disruptor.shutdown(shutdownTimeoutMs, TimeUnit.MILLISECONDS);
+    } catch (com.lmax.disruptor.TimeoutException e) {
+      LOG.warn("HAGroup {} shutdown timed out after {}ms, halting", this, 
shutdownTimeoutMs);
+      disruptor.halt();
     }
+    shutdownDisruptorExecutor();
+    logForwarder.stop();
+    logForwarder.close();
+    metrics.close();
+    LOG.info("HAGroup {} closed", this);
   }
 
   /**
@@ -902,20 +899,33 @@ public class ReplicationLogGroup {
   }
 
   /**
-   * Abort when we hit a fatal error
+   * Throws the given cause as an IOException. If it already is one, rethrows 
directly; otherwise
+   * wraps it.
    */
-  protected void abort(String reason, Throwable cause) {
-    // TODO better to use abort using RegionServerServices
-    String msg = "***** ABORTING region server: " + reason + " *****";
-    if (cause != null) {
-      msg += "\nCause:\n" + Throwables.getStackTraceAsString(cause);
+  static IOException asIOException(String msg, Throwable cause) {
+    if (cause instanceof IOException) {
+      return (IOException) cause;
     }
-    LOG.error(msg);
-    if (cause != null) {
-      throw new RuntimeException(msg, cause);
-    } else {
-      throw new RuntimeException(msg);
+    return cause != null ? new IOException(msg, cause) : new IOException(msg);
+  }
+
+  /**
+   * Abort the region server due to an unrecoverable replication failure. Must 
be called from a
+   * producer thread (never the event handler thread). Sets the fatal 
exception so remaining events
+   * drain instantly, then closes all resources and invokes the Abortable to 
trigger RS shutdown.
+   * Always throws so the caller unwinds.
+   */
+  protected void abort(String reason, Throwable cause) throws IOException {
+    String msg = "Aborting region server due to replication failure: " + 
reason;
+    LOG.error(msg, cause);
+    // Idempotent; may already be set by the event handler (ExecutionException 
path)
+    // but is needed here for the TimeoutException path where the event 
handler is still alive.
+    eventHandler.setFatalException(asIOException(msg, cause));
+    close();
+    if (abortable != null) {
+      abortable.abort(msg, cause);
     }
+    throw asIOException(msg, cause);
   }
 
   /**
@@ -923,12 +933,24 @@ public class ReplicationLogGroup {
    */
   protected class LogEventHandler implements EventHandler<LogEvent>, 
LifecycleAware {
     private final List<CompletableFuture<Void>> pendingSyncFutures = new 
ArrayList<>();
-    // Current replication mode implementation which will handle the events
     private ReplicationModeImpl currentModeImpl;
+    private volatile IOException fatalException;
 
     public LogEventHandler() {
     }
 
+    void setFatalException(IOException cause) {
+      if (fatalException != null) {
+        return;
+      }
+      LOG.error("HAGroup {} event handler hit fatal exception", 
ReplicationLogGroup.this, cause);
+      this.fatalException = cause;
+    }
+
+    IOException getFatalException() {
+      return fatalException;
+    }
+
     public void init() throws IOException {
       initializeMode(getMode());
     }
@@ -996,7 +1018,8 @@ public class ReplicationLogGroup {
         // is not processing any event like append/sync because this is the 
only thread
         // that is consuming the events from the ring buffer and handing them 
off to the
         // mode
-        currentModeImpl.onExit(true);
+        ReplicationModeImpl oldModeImpl = currentModeImpl;
+        disruptorExecutor.execute(() -> oldModeImpl.onExit(true));
         initializeMode(newMode);
       }
     }
@@ -1086,25 +1109,27 @@ public class ReplicationLogGroup {
      */
     @Override
     public void onEvent(LogEvent event, long sequence, boolean endOfBatch) 
throws Exception {
-      // Calculate time spent in ring buffer
       long currentTimeNs = System.nanoTime();
       long ringBufferTimeNs = currentTimeNs - event.timestampNs;
       metrics.updateRingBufferTime(ringBufferTimeNs);
+      if (fatalException != null) {
+        // Append events are ignored; sync futures are failed immediately
+        // so producer threads unblock without waiting for the sync timeout.
+        if (event.type == EVENT_TYPE_SYNC) {
+          event.syncFuture.completeExceptionally(fatalException);
+        }
+        return;
+      }
       try {
         switch (event.type) {
           case EVENT_TYPE_DATA:
             currentModeImpl.append(event.record);
-            // Process any pending syncs at the end of batch.
             if (endOfBatch) {
               processPendingSyncs(sequence);
             }
             return;
           case EVENT_TYPE_SYNC:
-            // Add this sync future to the pending list
-            // OK, to add the same future multiple times when we rewind the 
batch
-            // as completing an already completed future is a no-op
             pendingSyncFutures.add(event.syncFuture);
-            // Process any pending syncs at the end of batch.
             if (endOfBatch) {
               processPendingSyncs(sequence);
             }
@@ -1118,17 +1143,16 @@ public class ReplicationLogGroup {
             e);
           onFailure(event, sequence, e);
         } catch (Exception fatalEx) {
-          // Either we failed to switch the mode or we are in 
STORE_AND_FORWARD mode
-          // and got an exception. This is a fatal exception so halt the 
disruptor
-          // fail the pending sync events with the original exception
-          failPendingSyncs(sequence, e);
-          // Both SYNC and SAF writes failed. The local HBase WAL has the 
mutation but neither
-          // replication pipeline does, so the SAF forwarder cannot reconcile 
it. Abort the RS
-          // so region reassignment + preWALRestore re-ships the orphaned 
edits.
-          abort("Both SYNC and SAF replication writes failed", fatalEx);
-          // halt the disruptor with the fatal exception
-          throw fatalEx;
+          IOException fatalIOE =
+            asIOException("Both SYNC and SAF replication writes failed", 
fatalEx);
+          setFatalException(fatalIOE);
+          failPendingSyncs(sequence, fatalIOE);
         }
+      } catch (Throwable t) {
+        IOException wrapped =
+          new IOException("Unexpected error in event handler at sequence " + 
sequence, t);
+        setFatalException(wrapped);
+        failPendingSyncs(sequence, wrapped);
       }
     }
 
@@ -1139,36 +1163,35 @@ public class ReplicationLogGroup {
 
     @Override
     public void onShutdown() {
-      boolean isGracefulShutdown = gracefulShutdownEventHandlerFlag.get();
+      boolean graceful = (fatalException == null);
       LOG.info("HAGroup {} shutting down event handler graceful={}", 
ReplicationLogGroup.this,
-        isGracefulShutdown);
-      currentModeImpl.onExit(isGracefulShutdown);
+        graceful);
+      currentModeImpl.onExit(graceful);
     }
   }
 
   /**
-   * Handler for critical errors during the Disruptor lifecycle that closes 
the writer to prevent
-   * data loss.
+   * Safety-net handler for exceptions that escape the event handler. Should 
never fire because
+   * onEvent catches all Throwables. If it does fire, sets a fatal exception 
so pending futures
+   * fail.
    */
   protected class LogExceptionHandler implements ExceptionHandler<LogEvent> {
     @Override
     public void handleEventException(Throwable e, long sequence, LogEvent 
event) {
-      String message = "Exception processing sequence " + sequence + "  for 
event " + event;
-      LOG.error(message, e);
-      closeOnError();
+      LOG.error("UNEXPECTED: Exception escaped onEvent at sequence {}", 
sequence, e);
+      eventHandler
+        .setFatalException(new IOException("Exception escaped to 
LogExceptionHandler", e));
     }
 
     @Override
     public void handleOnStartException(Throwable e) {
       LOG.error("Exception during Disruptor startup", e);
-      closeOnError();
+      eventHandler.setFatalException(new IOException("Disruptor startup 
failed", e));
     }
 
     @Override
     public void handleOnShutdownException(Throwable e) {
-      // Should not happen, but if it does, the regionserver is aborting or 
shutting down.
       LOG.error("Exception during Disruptor shutdown", e);
-      closeOnError();
     }
   }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
index 1238513135..114e519b42 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
@@ -20,6 +20,8 @@ package org.apache.phoenix.replication;
 import java.io.IOException;
 import org.apache.phoenix.replication.ReplicationLogGroup.Record;
 import org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 
@@ -31,6 +33,7 @@ import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTes
  * </p>
  */
 public abstract class ReplicationModeImpl {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationModeImpl.class);
   protected final ReplicationLogGroup logGroup;
 
   // The mode manages the underlying log to which the append and sync events 
will be sent
@@ -91,4 +94,17 @@ public abstract class ReplicationModeImpl {
       log.close(graceful);
     }
   }
+
+  protected ReplicationLogGroup.ReplicationMode transitionToStoreAndForward() 
throws IOException {
+    logGroup.getMetrics().incrementSyncToSafTransitions();
+    try {
+      logGroup.setHAGroupStatusToStoreAndForward();
+    } catch (Exception ex) {
+      String message =
+        String.format("HAGroup %s could not update status to 
STORE_AND_FORWARD", logGroup);
+      LOG.error(message, ex);
+      throw ReplicationLogGroup.asIOException(message, ex);
+    }
+    return ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
+  }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
index 7bcb4c89e0..8f205ea767 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/StoreAndForwardModeImpl.java
@@ -110,12 +110,10 @@ public class StoreAndForwardModeImpl extends 
ReplicationModeImpl {
 
   @Override
   ReplicationMode onFailure(Throwable e) throws IOException {
-    // Treating failures in STORE_AND_FORWARD mode as fatal errors
+    // Failures in STORE_AND_FORWARD mode are fatal — throw to let the event 
handler abort
     String message = String.format("HAGroup %s mode=%s got error", logGroup, 
this);
     LOG.error(message, e);
-    logGroup.abort(message, e);
-    // unreachable, we remain in the same mode
-    return STORE_AND_FORWARD;
+    throw ReplicationLogGroup.asIOException(message, e);
   }
 
   @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
index 02c57f7f3f..6c85ee27d4 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncAndForwardModeImpl.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.replication;
 
-import static 
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
 import static 
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC_AND_FORWARD;
 
 import java.io.IOException;
@@ -62,17 +61,7 @@ public class SyncAndForwardModeImpl extends 
ReplicationModeImpl {
   @Override
   ReplicationMode onFailure(Throwable e) throws IOException {
     LOG.info("HAGroup {} mode={} got error", logGroup, this, e);
-    logGroup.getMetrics().incrementSyncToSafTransitions();
-    try {
-      logGroup.setHAGroupStatusToStoreAndForward();
-    } catch (Exception ex) {
-      // Fatal error when we can't update the HAGroup status
-      String message =
-        String.format("HAGroup %s could not update status to 
STORE_AND_FORWARD", logGroup);
-      LOG.error(message, ex);
-      logGroup.abort(message, ex);
-    }
-    return STORE_AND_FORWARD;
+    return transitionToStoreAndForward();
   }
 
   @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
index 0a5b5a48c6..ca258a5b76 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SyncModeImpl.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.replication;
 
-import static 
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
 import static 
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.SYNC;
 
 import java.io.IOException;
@@ -57,18 +56,7 @@ public class SyncModeImpl extends ReplicationModeImpl {
   @Override
   ReplicationMode onFailure(Throwable e) throws IOException {
     LOG.info("HAGroup {} mode={} got error", logGroup, this, e);
-    logGroup.getMetrics().incrementSyncToSafTransitions();
-    try {
-      // first update the HAGroupStore state
-      logGroup.setHAGroupStatusToStoreAndForward();
-    } catch (Exception ex) {
-      // Fatal error when we can't update the HAGroup status
-      String message =
-        String.format("HAGroup %s could not update status to 
STORE_AND_FORWARD", logGroup);
-      LOG.error(message, ex);
-      logGroup.abort(message, ex);
-    }
-    return STORE_AND_FORWARD;
+    return transitionToStoreAndForward();
   }
 
   @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
index 88eeb791e3..d062aa8c53 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplay.java
@@ -128,7 +128,6 @@ public class ReplicationLogReplay {
 
   public void close() {
     LOG.info("Closing ReplicationLogReplay for haGroup: {}", haGroupName);
-    replicationLogDiscoveryReplay.getReplicationLogFileTracker().close();
     replicationLogDiscoveryReplay.close();
     // Remove the instance from cache
     INSTANCES.remove(haGroupName);
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index ddd0cea0ec..3fd7702e6a 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -264,13 +264,12 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     logGroup.append(tableName, commitId, put);
 
     // sync on the writer will timeout — syncInternal wraps in 
PhoenixWALSyncTimeoutException
-    // and calls abort() which throws RuntimeException
+    // and calls abort() which rethrows the IOException cause directly
     try {
       logGroup.sync();
-      fail("Should have thrown RuntimeException because sync timed out");
-    } catch (RuntimeException e) {
-      assertTrue("Expected PhoenixWALSyncTimeoutException cause",
-        e.getCause() instanceof PhoenixWALSyncTimeoutException);
+      fail("Should have thrown PhoenixWALSyncTimeoutException because sync 
timed out");
+    } catch (PhoenixWALSyncTimeoutException e) {
+      // expected
     }
     // reset
     doNothing().when(innerWriter).sync();
@@ -624,27 +623,25 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     doThrow(new RuntimeException("Simulated critical 
error")).when(innerWriter).append(anyString(),
       anyLong(), any(Mutation.class));
 
-    // Append data. This should trigger the LogExceptionHandler, which will 
close logWriter.
-    // The sync future times out, syncInternal wraps in 
PhoenixWALSyncTimeoutException and aborts.
+    // Append publishes to the ring buffer. The event handler catches the 
RuntimeException via
+    // catch(Throwable), poisons itself, and fails the sync future. The 
producer receives
+    // ExecutionException and calls abort().
     logGroup.append(tableName, commitId, put);
     try {
       logGroup.sync();
-      fail("Should have thrown RuntimeException because sync timed out");
-    } catch (RuntimeException e) {
-      assertTrue("Expected PhoenixWALSyncTimeoutException cause",
-        e.getCause() instanceof PhoenixWALSyncTimeoutException);
+      fail("Should have thrown IOException from abort");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Unexpected error in event handler"));
     }
 
-    // Verify that subsequent operations fail because the log is closed
+    // Verify that subsequent operations fail
     try {
       logGroup.append(tableName, commitId + 1, put);
-      fail("Should have thrown IOException because log is closed");
+      fail("Should have thrown IOException because log is poisoned/closed");
     } catch (IOException e) {
-      assertTrue("Expected an IOException because log is closed",
-        e.getMessage().contains("Closed"));
+      // expected
     }
 
-    // Verify that the inner writer was closed by the LogExceptionHandler
     verify(innerWriter, times(1)).close();
   }
 
@@ -717,15 +714,14 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
 
     // Append data
     logGroup.append(tableName, commitId, put);
-    // Try to sync. Should fail after exhausting retries and then switch to 
STORE_AND_FORWARD
+    // Sync fails, mode transition to SAF also fails because HAGroupStore 
update throws.
+    // The event handler poisons itself, and the producer calls abort().
     try {
       logGroup.sync();
-      fail("Should have thrown exception because of failure to update mode");
-    } catch (RuntimeException ex) {
-      assertTrue(ex.getMessage().contains("Simulated sync failure"));
+      fail("Should have thrown IOException because of failure to update mode");
+    } catch (IOException ex) {
+      assertTrue(ex.getMessage().contains("Simulated failure to update 
HAGroupStore state"));
     }
-    // wait for the event processor thread to clean up
-    Thread.sleep(3);
   }
 
   /**
@@ -971,33 +967,30 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     // Configure writer to throw RuntimeException on getLength()
     doThrow(new RuntimeException("Simulated critical 
error")).when(innerWriter).getLength();
 
-    // Append data. This should trigger the LogExceptionHandler, which will 
close logWriter.
-    // The sync future times out, syncInternal wraps in 
PhoenixWALSyncTimeoutException and aborts.
+    // Append publishes to the ring buffer. The event handler catches the 
RuntimeException,
+    // poisons itself, and fails the sync future. The producer calls abort().
     logGroup.append(tableName, commitId, put);
     try {
       logGroup.sync();
-      fail("Should have thrown RuntimeException because sync timed out");
-    } catch (RuntimeException e) {
-      assertTrue("Expected PhoenixWALSyncTimeoutException cause",
-        e.getCause() instanceof PhoenixWALSyncTimeoutException);
+      fail("Should have thrown IOException from abort");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Unexpected error in event handler"));
     }
 
-    // Verify that subsequent operations fail because the log is closed
+    // Verify that subsequent operations fail
     try {
       logGroup.append(tableName, commitId + 1, put);
-      fail("Should have thrown IOException because log is closed");
+      fail("Should have thrown IOException because log is poisoned/closed");
     } catch (IOException e) {
-      assertTrue("Expected an IOException because log is closed",
-        e.getMessage().contains("Closed"));
+      // expected
     }
 
-    // Verify that the inner writer was closed by the LogExceptionHandler
     verify(innerWriter, times(1)).close();
   }
 
   /**
-   * Tests behavior when a RuntimeException occurs during append() after 
closeOnError() has been
-   * called. Verifies that the system properly rejects sync operations after 
being closed.
+   * Tests behavior when a RuntimeException occurs during append and 
subsequent appends are
+   * rejected. Verifies that the system properly rejects operations after 
being poisoned/closed.
    */
   @Test
   public void testAppendAfterCloseOnError() throws Exception {
@@ -1013,32 +1006,30 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     doThrow(new RuntimeException("Simulated critical 
error")).when(innerWriter).append(anyString(),
       anyLong(), any(Mutation.class));
 
-    // Append data to trigger closeOnError()
+    // Append publishes to the ring buffer. The event handler catches the 
RuntimeException,
+    // poisons itself, and fails the sync future. The producer calls abort().
     logGroup.append(tableName, commitId, put);
     try {
       logGroup.sync();
-      fail("Should have thrown RuntimeException because sync timed out");
-    } catch (RuntimeException e) {
-      assertTrue("Expected PhoenixWALSyncTimeoutException cause",
-        e.getCause() instanceof PhoenixWALSyncTimeoutException);
+      fail("Should have thrown IOException from abort");
+    } catch (IOException e) {
+      // expected
     }
 
-    // Verify that subsequent append operations fail because the log is closed
+    // Verify that subsequent append operations fail because the log is 
poisoned/closed
     try {
       logGroup.append(tableName, commitId, put);
-      fail("Should have thrown IOException because log is closed");
+      fail("Should have thrown IOException because log is poisoned/closed");
     } catch (IOException e) {
-      assertTrue("Expected an IOException because log is closed",
-        e.getMessage().contains("Closed"));
+      // expected
     }
 
-    // Verify that the inner writer was closed by the LogExceptionHandler
     verify(innerWriter, times(1)).close();
   }
 
   /**
-   * Tests behavior when a RuntimeException occurs during sync() after 
closeOnError() has been
-   * called. Verifies that the system properly rejects sync operations after 
being closed.
+   * Tests behavior when a RuntimeException occurs during append and 
subsequent syncs are rejected.
+   * Verifies that the system properly rejects operations after being 
poisoned/closed.
    */
   @Test
   public void testSyncAfterCloseOnError() throws Exception {
@@ -1054,26 +1045,24 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     doThrow(new RuntimeException("Simulated critical 
error")).when(innerWriter).append(anyString(),
       anyLong(), any(Mutation.class));
 
-    // Append data to trigger closeOnError()
+    // Append publishes to the ring buffer. The event handler catches the 
RuntimeException,
+    // poisons itself, and fails the sync future. The producer calls abort().
     logGroup.append(tableName, commitId, put);
     try {
       logGroup.sync();
-      fail("Should have thrown RuntimeException because sync timed out");
-    } catch (RuntimeException e) {
-      assertTrue("Expected PhoenixWALSyncTimeoutException cause",
-        e.getCause() instanceof PhoenixWALSyncTimeoutException);
+      fail("Should have thrown IOException from abort");
+    } catch (IOException e) {
+      // expected
     }
 
-    // Verify that subsequent sync operations fail because the log is closed
+    // Verify that subsequent sync operations fail because the log is 
poisoned/closed
     try {
       logGroup.sync();
-      fail("Should have thrown IOException because log is closed");
+      fail("Should have thrown IOException because log is poisoned/closed");
     } catch (IOException e) {
-      assertTrue("Expected an IOException because log is closed",
-        e.getMessage().contains("Closed"));
+      // expected
     }
 
-    // Verify that the inner writer was closed by the LogExceptionHandler
     verify(innerWriter, times(1)).close();
   }
 
@@ -1722,11 +1711,10 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     logGroup.append(tableName, commitId, put);
     try {
       logGroup.sync();
-      fail("Should have thrown RuntimeException from abort");
-    } catch (RuntimeException e) {
-      assertTrue("Abort message should mention both SYNC and SAF",
-        e.getMessage().contains("Both SYNC and SAF replication writes failed")
-          || e.getMessage().contains("ABORTING"));
+      fail("Should have thrown IOException from abort");
+    } catch (IOException e) {
+      assertTrue("Abort message should contain sync failure",
+        e.getMessage().contains("Simulated sync failure"));
     }
   }
 
@@ -1764,10 +1752,10 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     logGroup.append(tableName, commitId, put);
     try {
       logGroup.sync();
-      fail("Should have thrown RuntimeException from abort");
-    } catch (RuntimeException e) {
+      fail("Should have thrown IOException from abort");
+    } catch (IOException e) {
       assertTrue("Abort should fire from SAF failure path",
-        e.getMessage().contains("ABORTING") || e.getMessage().contains("got 
error"));
+        e.getMessage().contains("Simulated SAF sync failure"));
     }
   }
 


Reply via email to