This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b614d6d16b5 Pipe: Fixed the bug that drop pipe may stuck when
disruptor is interrupted or ring buffer full (#17673)
b614d6d16b5 is described below
commit b614d6d16b5ab2ed50216028e4467b165004f6fb
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 2 14:29:00 2026 +0800
Pipe: Fixed the bug that drop pipe may stuck when disruptor is interrupted
or ring buffer full (#17673)
* Fix interrupt
* spotless
---
.../apache/iotdb/db/i18n/DataNodePipeMessages.java | 2 +
.../apache/iotdb/db/i18n/DataNodePipeMessages.java | 1 +
.../realtime/assigner/DisruptorQueue.java | 19 ++++++--
.../realtime/assigner/PipeDataRegionAssigner.java | 41 ++++++++++++++--
.../realtime/disruptor/BatchEventProcessor.java | 11 +++--
.../realtime/disruptor/MultiProducerSequencer.java | 21 ++++++++
.../dataregion/realtime/disruptor/RingBuffer.java | 22 ++++++++-
.../realtime/disruptor/DisruptorShutdownTest.java | 56 ++++++++++++++++++++++
8 files changed, 160 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index 096aa07914b..f669934149d 100644
---
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -544,6 +544,8 @@ public final class DataNodePipeMessages {
public static final String PIPE_UNSUPPORTED_SOURCE_REALTIME_MODE_CREATE_A =
"Pipe: Unsupported source realtime mode: {}, create a hybrid source.";
public static final String PROCESSOR_INTERRUPTED = "Processor interrupted";
+ public static final String PROCESSOR_INTERRUPTED_UNEXPECTEDLY =
+ "Processor interrupted unexpectedly, continue running";
public static final String PROCESSOR_STOPPED = "Processor stopped";
public static final String SET_FOR_HISTORICAL_DELETION_EVENT =
"[{}]Set {} for historical deletion event {}";
diff --git
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index 4d514c19ba5..131a83cf808 100644
---
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -521,6 +521,7 @@ public final class DataNodePipeMessages {
public static final String PIPE_UNSUPPORTED_SOURCE_REALTIME_MODE_CREATE_A =
"Pipe:不支持的 source realtime mode: {}, create a hybrid source。";
public static final String PROCESSOR_INTERRUPTED = "处理器被中断";
+ public static final String PROCESSOR_INTERRUPTED_UNEXPECTEDLY =
"处理器意外中断,继续运行";
public static final String PROCESSOR_STOPPED = "处理器已停止";
public static final String SET_FOR_HISTORICAL_DELETION_EVENT =
"[{}]Set {} for historical deletion event {}";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
index dc84095fb74..3ef29dbc900 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java
@@ -88,16 +88,29 @@ public class DisruptorQueue {
}
public void publish(final PipeRealtimeEvent event) {
+ publishOrDrop(event);
+ }
+
+ public boolean publishOrDrop(final PipeRealtimeEvent event) {
final EnrichedEvent innerEvent = event.getEvent();
if (innerEvent instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) innerEvent).recordDisruptorSize(ringBuffer);
}
- ringBuffer.publishEvent((container, sequence, o) ->
container.setEvent(event), event);
- mayPrintExceedingLog();
+ final boolean published =
+ ringBuffer.publishEvent(
+ (container, sequence, o) -> container.setEvent(event), event,
this::isClosed);
+ if (published) {
+ mayPrintExceedingLog();
+ }
+ return published;
}
- public void shutdown() {
+ public void closeInput() {
isClosed = true;
+ }
+
+ public void shutdown() {
+ closeInput();
// use shutdown instead of halt to ensure all published events have been
handled
disruptor.shutdown();
allocatedMemoryBlock.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 3f84e138a48..90466ba3389 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -71,6 +71,7 @@ public class PipeDataRegionAssigner implements Closeable {
private volatile int listenToInsertNodeSourceCount = 0;
private final PipeEventCounter eventCounter = new
PipeDataRegionEventCounter();
+ private int inFlightPublishCount = 0;
public int getDataRegionId() {
return dataRegionId;
@@ -104,14 +105,28 @@ public class PipeDataRegionAssigner implements Closeable {
((PipeHeartbeatEvent) innerEvent).onPublished();
}
- // use synchronized here for completely preventing reference count leaks
under extreme thread
- // scheduling when closing
synchronized (this) {
- if (!disruptor.isClosed()) {
- disruptor.publish(event);
- } else {
+ if (disruptor.isClosed()) {
onAssignedHook(event);
+ return;
}
+ inFlightPublishCount++;
+ }
+
+ boolean isPublished = false;
+ try {
+ isPublished = disruptor.publishOrDrop(event);
+ } finally {
+ synchronized (this) {
+ inFlightPublishCount--;
+ if (inFlightPublishCount == 0) {
+ notifyAll();
+ }
+ }
+ }
+
+ if (!isPublished) {
+ onAssignedHook(event);
}
}
@@ -276,9 +291,25 @@ public class PipeDataRegionAssigner implements Closeable {
public synchronized void close() {
PipeAssignerMetrics.getInstance().deregister(dataRegionId);
+ boolean interrupted = false;
+ disruptor.closeInput();
+ while (inFlightPublishCount > 0) {
+ try {
+ wait();
+ } catch (final InterruptedException e) {
+ interrupted = true;
+ LOGGER.warn(
+ "Interrupted while waiting for in-flight publishes to finish when
closing assigner on data region {}.",
+ dataRegionId);
+ }
+ }
+
final long startTime = System.currentTimeMillis();
disruptor.shutdown();
matcher.clear();
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
LOGGER.info(
DataNodePipeMessages.PIPE_ASSIGNER_ON_DATA_REGION_SHUTDOWN_INTERNAL,
dataRegionId,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
index 0227e07d5f0..824f7e84b9b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java
@@ -82,11 +82,14 @@ public final class BatchEventProcessor<T> implements
Runnable {
nextSequence = processAvailableEvents(nextSequence, availableSequence);
} catch (final InterruptedException ex) {
- if (running) {
- Thread.currentThread().interrupt();
- LOGGER.info(DataNodePipeMessages.PROCESSOR_INTERRUPTED);
+ if (!running) {
+ break;
}
- break;
+ // A transient interrupt should not permanently stop the consumer
thread. Otherwise the
+ // gating sequence will stop advancing and producers may block forever
on a full ring
+ // buffer, making the later close path appear stuck.
+ Thread.interrupted();
+ LOGGER.warn(DataNodePipeMessages.PROCESSOR_INTERRUPTED_UNEXPECTEDLY);
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence,
ringBuffer.get(nextSequence));
sequence.set(nextSequence);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
index 22a85da5ffd..cb0efe3039d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java
@@ -25,6 +25,7 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
+import java.util.function.BooleanSupplier;
/**
* Multi-producer sequencer for coordinating concurrent publishers
@@ -112,14 +113,31 @@ public final class MultiProducerSequencer {
* @return highest claimed sequence number
*/
public long next(int n) {
+ return next(n, () -> false);
+ }
+
+ /**
+ * Claim next n sequences for publishing, or abort if the caller is closing.
+ *
+ * @param n number of sequences to claim
+ * @param abortCondition returns {@code true} if the claim should be
abandoned
+ * @return highest claimed sequence number, or {@link
Sequence#INITIAL_VALUE} if aborted
+ */
+ public long next(final int n, final BooleanSupplier abortCondition) {
if (n < 1) {
throw new IllegalArgumentException(DataNodePipeMessages.N_MUST_BE_0);
}
+ final BooleanSupplier effectiveAbortCondition =
+ abortCondition != null ? abortCondition : () -> false;
long current;
long next;
do {
+ if (effectiveAbortCondition.getAsBoolean()) {
+ return Sequence.INITIAL_VALUE;
+ }
+
current = cursor.get();
next = current + n;
@@ -130,6 +148,9 @@ public final class MultiProducerSequencer {
long gatingSequence =
Sequence.getMinimumSequence(gatingSequences.get(), current);
if (wrapPoint > gatingSequence) {
+ if (effectiveAbortCondition.getAsBoolean()) {
+ return Sequence.INITIAL_VALUE;
+ }
LockSupport.parkNanos(1);
continue;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
index 4f7ba9e44b7..ea94cc85300 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import java.util.function.BooleanSupplier;
+
/**
* Left-hand side padding for cache line alignment
*
@@ -207,8 +209,26 @@ public final class RingBuffer<E> extends
RingBufferFields<E> {
* @param <A> argument type
*/
public <A> void publishEvent(EventTranslator<E, A> translator, A arg0) {
- final long sequence = sequencer.next(1);
+ publishEvent(translator, arg0, () -> false);
+ }
+
+ /**
+ * Publish event using a translator function, or abort if the caller is
closing.
+ *
+ * @param translator function to populate the event
+ * @param arg0 argument passed to translator
+ * @param abortCondition returns {@code true} if the publish should be
abandoned
+ * @param <A> argument type
+ * @return {@code true} if the event is published, {@code false} if the
publish is aborted
+ */
+ public <A> boolean publishEvent(
+ final EventTranslator<E, A> translator, final A arg0, final
BooleanSupplier abortCondition) {
+ final long sequence = sequencer.next(1, abortCondition);
+ if (sequence == Sequence.INITIAL_VALUE) {
+ return false;
+ }
translateAndPublish(translator, sequence, arg0);
+ return true;
}
/**
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
index 3fd40c4d4f2..ef57ea625ae 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/DisruptorShutdownTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -113,6 +114,61 @@ public class DisruptorShutdownTest {
Assert.assertFalse(processorThread.isAlive());
}
+ @Test
+ public void testUnexpectedInterruptDoesNotStopProcessor() throws Exception {
+ final AtomicReference<Thread> processorThreadReference = new
AtomicReference<>();
+ final ThreadFactory threadFactory =
+ runnable -> {
+ final Thread thread = new Thread(runnable,
"pipe-disruptor-unexpected-interrupt-test");
+ processorThreadReference.set(thread);
+ return thread;
+ };
+
+ final CountDownLatch handled = new CountDownLatch(1);
+ final Disruptor<TestEvent> disruptor = new Disruptor<>(TestEvent::new, 32,
threadFactory);
+ final RingBuffer<TestEvent> ringBuffer =
+ disruptor.handleEventsWith((event, sequence, endOfBatch) ->
handled.countDown()).start();
+
+ final Thread processorThread = processorThreadReference.get();
+ Assert.assertNotNull(processorThread);
+
+ TimeUnit.MILLISECONDS.sleep(50);
+ processorThread.interrupt();
+
+ ringBuffer.publishEvent((event, sequence, value) -> event.value = value,
1);
+ Assert.assertTrue(handled.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(processorThread.isAlive());
+
+ disruptor.shutdown();
+ Assert.assertFalse(processorThread.isAlive());
+ }
+
+ @Test
+ public void testPublishEventCanAbortWhenClosingWhileBufferIsFull() throws
Exception {
+ final RingBuffer<TestEvent> ringBuffer =
RingBuffer.createMultiProducer(TestEvent::new, 1);
+ final Sequence gatingSequence = new Sequence();
+ ringBuffer.addGatingSequences(gatingSequence);
+ ringBuffer.publishEvent((event, sequence, value) -> event.value = value,
1);
+
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final AtomicBoolean published = new AtomicBoolean(true);
+ final Thread publisherThread =
+ new Thread(
+ () ->
+ published.set(
+ ringBuffer.publishEvent(
+ (event, sequence, value) -> event.value = value, 2,
isClosed::get)),
+ "pipe-disruptor-publish-abort-test");
+
+ publisherThread.start();
+ TimeUnit.MILLISECONDS.sleep(50);
+ isClosed.set(true);
+ publisherThread.join(TimeUnit.SECONDS.toMillis(5));
+
+ Assert.assertFalse(publisherThread.isAlive());
+ Assert.assertFalse(published.get());
+ }
+
private static class TestEvent {
private int value;
}