This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 9cd146ae2f PHOENIX-7864 Wake idle consumer on rotation via swap event 
(#2487)
9cd146ae2f is described below

commit 9cd146ae2fd07c9552048a2439a916026bfffc3d
Author: tkhurana <[email protected]>
AuthorDate: Thu May 21 16:41:26 2026 -0700

    PHOENIX-7864 Wake idle consumer on rotation via swap event (#2487)
    
    When the producer is idle, a scheduled rotation stages a new writer in
    pendingWriter but the consumer never reaches checkAndReplaceWriter()
    since that runs only on DATA/SYNC events. The reader's round-buffer
    interval can then expire and trigger HDFS lease recovery on the
    still-open writer.
    
    Add EVENT_TYPE_SWAP, published non-blocking by LogRotationTask after
    staging. The handler swaps and unifies endOfBatch handling so a SWAP
    ending a batch does not strand pending sync futures.
---
 .../apache/phoenix/replication/ReplicationLog.java |   7 ++
 .../phoenix/replication/ReplicationLogGroup.java   |  44 +++++++--
 .../phoenix/replication/ReplicationModeImpl.java   |   3 -
 .../replication/ReplicationLogGroupTest.java       | 104 +++++++++++++++++++--
 4 files changed, 139 insertions(+), 19 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index 16e62f5bb8..44d4972e75 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -439,9 +439,11 @@ public class ReplicationLog {
       if (closed.get()) {
         return;
       }
+      boolean staged = false;
       try {
         LogFileWriter newWriter = createNewWriter();
         LogFileWriter undrained = pendingWriter.getAndSet(newWriter);
+        staged = true;
         if (undrained != null) {
           closeWriter(undrained);
         }
@@ -469,6 +471,11 @@ public class ReplicationLog {
           latch.countDown();
           rotationStagedLatch = null;
         }
+        if (staged) {
+          // Wake an idle consumer so it drains pendingWriter before the 
reader's round buffer
+          // expires. Non-blocking — see ReplicationLogGroup#publishSwapEvent.
+          logGroup.publishSwapEvent();
+        }
       }
     }
   }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index b3dc9f7f00..e7e5a69a3b 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.replication;
 import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
 import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
 import static 
org.apache.phoenix.replication.ReplicationLogGroup.LogEvent.EVENT_TYPE_DATA;
+import static 
org.apache.phoenix.replication.ReplicationLogGroup.LogEvent.EVENT_TYPE_SWAP;
 import static 
org.apache.phoenix.replication.ReplicationLogGroup.LogEvent.EVENT_TYPE_SYNC;
 import static 
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.INIT;
 import static 
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode.STORE_AND_FORWARD;
@@ -29,6 +30,7 @@ import static 
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.InsufficientCapacityException;
 import com.lmax.disruptor.LifecycleAware;
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.YieldingWaitStrategy;
@@ -314,6 +316,7 @@ public class ReplicationLogGroup {
 
     public static final byte EVENT_TYPE_DATA = 0;
     public static final byte EVENT_TYPE_SYNC = 1;
+    public static final byte EVENT_TYPE_SWAP = 2;
 
     public void setValues(int type, Record record, CompletableFuture<Void> 
syncFuture) {
       this.type = type;
@@ -638,6 +641,29 @@ public class ReplicationLogGroup {
     }
   }
 
+  /**
+   * Publish a non-blocking swap marker so an idle consumer wakes up and runs
+   * {@link ReplicationLog#checkAndReplaceWriter(boolean)} promptly. Called by
+   * {@link ReplicationLog.LogRotationTask} after staging a new pending writer.
+   * <p>
+   * Uses {@code tryNext()} so a full ring buffer never blocks the rotation 
thread — when the buffer
+   * is full the consumer is actively draining and will hit {@code 
checkAndReplaceWriter} on its
+   * own.
+   */
+  protected void publishSwapEvent() {
+    try {
+      long sequence = ringBuffer.tryNext();
+      try {
+        LogEvent event = ringBuffer.get(sequence);
+        event.setValues(EVENT_TYPE_SWAP, null, null);
+      } finally {
+        ringBuffer.publish(sequence);
+      }
+    } catch (InsufficientCapacityException e) {
+      LOG.debug("HAGroup {} ring buffer full, skipping swap event publish", 
this);
+    }
+  }
+
   /**
    * Check if this ReplicationLogGroup is closed.
    * @return true if closed, false otherwise
@@ -1128,19 +1154,21 @@ public class ReplicationLogGroup {
         switch (event.type) {
           case EVENT_TYPE_DATA:
             currentModeImpl.append(event.record);
-            if (endOfBatch) {
-              processPendingSyncs(sequence);
-            }
-            return;
+            break;
           case EVENT_TYPE_SYNC:
             pendingSyncFutures.add(event.syncFuture);
-            if (endOfBatch) {
-              processPendingSyncs(sequence);
-            }
-            return;
+            break;
+          case EVENT_TYPE_SWAP:
+            // Wake-up marker from LogRotationTask. Drain the staged writer so 
the old writer is
+            // closed before the reader's round buffer expires. No append/sync 
to perform.
+            currentModeImpl.getReplicationLog().checkAndReplaceWriter(true);
+            break;
           default:
             throw new UnsupportedOperationException("Unknown event type: " + 
event.type);
         }
+        if (endOfBatch) {
+          processPendingSyncs(sequence);
+        }
       } catch (IOException e) {
         try {
           LOG.info("Failed to process event at sequence {} on mode {}", 
sequence, currentModeImpl,
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
index 114e519b42..4c5b9e5fbe 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationModeImpl.java
@@ -23,8 +23,6 @@ import 
org.apache.phoenix.replication.ReplicationLogGroup.ReplicationMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 /**
  * Base class for different replication modes.
  * <p>
@@ -69,7 +67,6 @@ public abstract class ReplicationModeImpl {
   }
 
   /** Returns the underlying log abstraction */
-  @VisibleForTesting
   ReplicationLog getReplicationLog() {
     return log;
   }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 55bce05d74..c204cd0a27 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -1267,6 +1267,12 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     LogFileWriter writer = activeLog.getWriter();
     assertNotNull("Writer should not be null", writer);
 
+    // Configure all stubs before publishing any events. Stubbing a Mockito 
spy while the
+    // consumer thread is concurrently invoking methods on the same spy is 
racy — Mockito's
+    // invocation/stub state is not thread-safe and the partially-applied stub 
can be matched
+    // against an unrelated method on the consumer thread.
+    doThrow(new IOException("Simulate append 
failure")).when(writer).append(tableName, commitId5,
+      put5);
     // Rotated writers must also fail on the 5th append so the retry doesn't 
rescue the loop.
     doAnswer(invocation -> {
       LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
@@ -1279,11 +1285,6 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     logGroup.append(tableName, commitId2, put2);
     logGroup.append(tableName, commitId3, put3);
     logGroup.append(tableName, commitId4, put4);
-
-    // configure initial writer to throw IOException on the 5th append
-    doThrow(new IOException("Simulate append 
failure")).when(writer).append(tableName, commitId5,
-      put5);
-
     logGroup.append(tableName, commitId5, put5);
     logGroup.sync();
 
@@ -1393,12 +1394,14 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     LogFileWriter initialWriter = activeLog.getWriter();
     assertNotNull("Initial writer should not be null", initialWriter);
 
+    // Configure initial writer's sync to fail (simulating broken stream after 
lease recovery).
+    // Done before the append publishes anything so the consumer thread cannot 
race with this
+    // stub installation on the same Mockito spy.
+    doThrow(new IOException("Simulated broken 
stream")).when(initialWriter).sync();
+
     // Append a record — goes to initialWriter
     logGroup.append(tableName, commitId, put);
 
-    // Configure initial writer's sync to fail (simulating broken stream after 
lease recovery)
-    doThrow(new IOException("Simulated broken 
stream")).when(initialWriter).sync();
-
     // Stage a pending writer via forced rotation
     activeLog.forceRotation();
 
@@ -1995,4 +1998,89 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     assertFalse("On-demand rotation must clear rotationRequested",
       activeLog.rotationRequested.get());
   }
+
+  /**
+   * Tests that a rotation tick wakes an idle consumer via the synthetic swap 
event so the staged
+   * pendingWriter is drained without waiting for the next real append/sync. 
Prior to this fix the
+   * undrained writer remained installed until the next data event, which on 
an idle system could
+   * exceed the reader's round buffer and trigger lease recovery.
+   */
+  @Test
+  public void testIdleConsumerSwapsOnRotation() throws Exception {
+    final String tableName = "TBLICSOR";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+    final long commitId = 1L;
+    final int roundDurationSeconds = 2;
+
+    conf.setInt(PHOENIX_REPLICATION_ROUND_DURATION_SECONDS_KEY, 
roundDurationSeconds);
+    recreateLogGroup();
+
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    LogFileWriter initialWriter = activeLog.getWriter();
+    assertNotNull("Initial writer should not be null", initialWriter);
+
+    // Establish baseline: append + sync to clear currentBatch
+    logGroup.append(tableName, commitId, put);
+    logGroup.sync();
+
+    // Wait past one rotation tick. With the synthetic swap event, the 
consumer wakes and drains
+    // pendingWriter — no further append/sync needed.
+    waitForRotationTick(roundDurationSeconds);
+
+    // The active writer should now be the one staged by the rotation tick.
+    LogFileWriter writerAfterIdleRotation = activeLog.getWriter();
+    assertTrue("Writer should have swapped after idle rotation tick without 
further events",
+      writerAfterIdleRotation != initialWriter);
+
+    // The initial writer should have been closed asynchronously by the swap.
+    verify(initialWriter, timeout(5000).times(1)).close();
+  }
+
+  /**
+   * Verifies that {@code publishSwapEvent} silently skips when the ring 
buffer is full — the
+   * consumer is actively draining and will hit {@code checkAndReplaceWriter} 
on its next event.
+   */
+  @Test
+  public void testPublishSwapEventOnFullRingBufferIsNoop() throws Exception {
+    final String tableName = "TBLPSEFRB";
+    final Mutation put = LogFileTestUtil.newPut("row", 1, 1);
+
+    LogFileWriter innerWriter = logGroup.getActiveLog().getWriter();
+    assertNotNull("Inner writer should not be null", innerWriter);
+
+    // Hold the consumer so the ring buffer cannot drain
+    final CountDownLatch holdConsumer = new CountDownLatch(1);
+    final CountDownLatch ringFull = new CountDownLatch(1);
+    doAnswer(invocation -> {
+      holdConsumer.await();
+      return invocation.callRealMethod();
+    }).when(innerWriter).append(anyString(), anyLong(), any(Mutation.class));
+
+    Thread filler = new Thread(() -> {
+      try {
+        long id = 0;
+        // Fill all ring buffer slots. The consumer is blocked on the first 
event so its
+        // gating sequence never advances, and these N publishes saturate the 
buffer.
+        for (int i = 0; i < TEST_RINGBUFFER_SIZE; i++) {
+          logGroup.append(tableName, id++, put);
+        }
+        ringFull.countDown();
+        // The next append will block until the consumer drains.
+        logGroup.append(tableName, id, put);
+      } catch (Exception ignored) {
+      }
+    });
+    filler.setDaemon(true);
+    filler.start();
+
+    // Wait until the filler has saturated the ring buffer.
+    ringFull.await();
+
+    // Should not throw / hang even though the ring buffer is full.
+    logGroup.publishSwapEvent();
+
+    // Release the consumer so the filler thread can finish and tearDown can 
close cleanly.
+    holdConsumer.countDown();
+    filler.join(5000);
+  }
 }

Reply via email to