This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch interrupt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8fe849985e6f0403d1b38945eb39319d892bcee7
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 21:13:37 2026 +0800

    gix
---
 .../realtime/disruptor/BatchEventProcessor.java    | 11 +++++---
 .../realtime/disruptor/DisruptorShutdownTest.java  | 30 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
index d0432821cf7..6a3b2cc62fd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
@@ -80,11 +80,14 @@ public final class BatchEventProcessor<T> implements 
Runnable {
         nextSequence = processAvailableEvents(nextSequence, availableSequence);
 
       } catch (final InterruptedException ex) {
-        if (running) {
-          Thread.currentThread().interrupt();
-          LOGGER.info("Processor interrupted");
+        if (!running) {
+          break;
         }
-        break;
+        // A transient interrupt should not permanently stop the consumer 
thread. Otherwise the
+        // gating sequence will stop advancing and producers may block forever 
on a full ring
+        // buffer, making the later close path appear stuck.
+        Thread.interrupted();
+        LOGGER.warn("Processor interrupted unexpectedly, continue running");
       } catch (final Throwable ex) {
         exceptionHandler.handleEventException(ex, nextSequence, 
ringBuffer.get(nextSequence));
         sequence.set(nextSequence);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
index 3fd40c4d4f2..9bdcf42de7d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
@@ -113,6 +113,36 @@ public class DisruptorShutdownTest {
     Assert.assertFalse(processorThread.isAlive());
   }
 
+  @Test
+  public void testUnexpectedInterruptDoesNotStopProcessor() throws Exception {
+    final AtomicReference<Thread> processorThreadReference = new 
AtomicReference<>();
+    final ThreadFactory threadFactory =
+        runnable -> {
+          final Thread thread =
+              new Thread(runnable, "pipe-disruptor-unexpected-interrupt-test");
+          processorThreadReference.set(thread);
+          return thread;
+        };
+
+    final CountDownLatch handled = new CountDownLatch(1);
+    final Disruptor<TestEvent> disruptor = new Disruptor<>(TestEvent::new, 32, 
threadFactory);
+    final RingBuffer<TestEvent> ringBuffer =
+        disruptor.handleEventsWith((event, sequence, endOfBatch) -> 
handled.countDown()).start();
+
+    final Thread processorThread = processorThreadReference.get();
+    Assert.assertNotNull(processorThread);
+
+    TimeUnit.MILLISECONDS.sleep(50);
+    processorThread.interrupt();
+
+    ringBuffer.publishEvent((event, sequence, value) -> event.value = value, 
1);
+    Assert.assertTrue(handled.await(5, TimeUnit.SECONDS));
+    Assert.assertTrue(processorThread.isAlive());
+
+    disruptor.shutdown();
+    Assert.assertFalse(processorThread.isAlive());
+  }
+
   private static class TestEvent {
     private int value;
   }

Reply via email to