This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch fix-13-disruptor in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c5b0b5cb4ea1b3c5b5c176af8f50d2a7ba25a6c3 Author: Zhenyu Luo <[email protected]> AuthorDate: Thu Oct 23 17:16:00 2025 +0800 Pipe: Implementing DisruptorQueue (#16639) --- LICENSE | 12 + iotdb-core/datanode/pom.xml | 4 - .../event/common/heartbeat/PipeHeartbeatEvent.java | 4 +- .../realtime/assigner/DisruptorQueue.java | 12 +- .../assigner/DisruptorQueueExceptionHandler.java | 3 +- .../realtime/disruptor/BatchEventProcessor.java | 120 +++++++++ .../dataregion/realtime/disruptor/Disruptor.java | 135 ++++++++++ .../EventFactory.java} | 40 ++- .../EventHandler.java} | 43 ++- .../ExceptionHandler.java} | 43 +-- .../realtime/disruptor/MultiProducerSequencer.java | 259 ++++++++++++++++++ .../dataregion/realtime/disruptor/RingBuffer.java | 295 +++++++++++++++++++++ .../dataregion/realtime/disruptor/Sequence.java | 122 +++++++++ .../realtime/disruptor/SequenceBarrier.java | 78 ++++++ .../realtime/disruptor/SequenceGroups.java | 77 ++++++ pom.xml | 6 - 16 files changed, 1167 insertions(+), 86 deletions(-) diff --git a/LICENSE b/LICENSE index 3f3de45e81a..0a1e5e42a9e 100644 --- a/LICENSE +++ b/LICENSE @@ -303,3 +303,15 @@ The following files include code modified from Dropwizard Metrics project. Copyright (c) 2010-2013 Coda Hale, Yammer.com, 2014-2021 Dropwizard Team Project page: https://github.com/dropwizard/metrics License: https://github.com/dropwizard/metrics/blob/release/4.2.x/LICENSE + +-------------------------------------------------------------------------------- + +The following files include code modified from LMax Disruptor project. + +./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/* + +LMax Disruptor is open source software licensed under the Apache License 2.0 and supported by the Apache Software Foundation. +Project page: https://github.com/LMAX-Exchange/disruptor +License: https://github.com/LMAX-Exchange/disruptor/blob/master/LICENCE.txt + +-------------------------------------------------------------------------------- diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index d6f8d124345..299e50f1be9 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -308,10 +308,6 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> - <dependency> - <groupId>com.lmax</groupId> - <artifactId>disruptor</artifactId> - </dependency> <dependency> <groupId>org.java-websocket</groupId> <artifactId>Java-WebSocket</artifactId> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java index ad29b285442..567e14f4fe3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java @@ -27,10 +27,10 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics; +import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.RingBuffer; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.pipe.api.event.Event; -import com.lmax.disruptor.RingBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -183,7 +183,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent { /////////////////////////////// Queue size Reporting /////////////////////////////// - public void recordDisruptorSize(final RingBuffer<?> ringBuffer) { + public void recordDisruptorSize(final RingBuffer ringBuffer) { if (shouldPrintMessage) { disruptorSize = ringBuffer.getBufferSize() - (int) ringBuffer.remainingCapacity(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java index 2a2fa110749..e01161d6b3c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueue.java @@ -26,12 +26,13 @@ import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; +import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.Disruptor; +import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.EventHandler; +import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.RingBuffer; -import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; -import com.lmax.disruptor.dsl.ProducerType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,9 +76,8 @@ public class DisruptorQueue { 32, Math.toIntExact( allocatedMemoryBlock.getMemoryUsageInBytes() / ringBufferEntrySizeInBytes)), - THREAD_FACTORY, - ProducerType.MULTI, - new BlockingWaitStrategy()); + THREAD_FACTORY); + disruptor.handleEventsWith( (container, sequence, endOfBatch) -> { final PipeRealtimeEvent realtimeEvent = container.getEvent(); @@ -127,7 +127,7 @@ public class DisruptorQueue { private static class EventContainer { - private PipeRealtimeEvent event; + private volatile PipeRealtimeEvent event; private EventContainer() {} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java index 91ad0224fc5..5330f3486f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java @@ -19,7 +19,8 @@ package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner; -import com.lmax.disruptor.ExceptionHandler; +import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.ExceptionHandler; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java new file mode 100644 index 00000000000..34930be977e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/BatchEventProcessor.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Batch event processor for consuming events + * + * <p>This implementation is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor) + * and simplified for IoTDB's Pipe module (removed complex lifecycle management). + * + * <p>Core algorithm preserved from LMAX Disruptor: + * + * <ul> + * <li>Batch processing loop + * <li>Sequence tracking + * <li>endOfBatch detection + * </ul> + * + * @param <T> event type + */ +public final class BatchEventProcessor<T> implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(BatchEventProcessor.class); + + private final RingBuffer<T> ringBuffer; + private final SequenceBarrier sequenceBarrier; + private final EventHandler<? super T> eventHandler; + private final Sequence sequence = new Sequence(); + private ExceptionHandler<? super T> exceptionHandler = new DefaultExceptionHandler<>(); + private volatile boolean running = true; + + public BatchEventProcessor( + RingBuffer<T> ringBuffer, SequenceBarrier barrier, EventHandler<? super T> eventHandler) { + this.ringBuffer = ringBuffer; + this.sequenceBarrier = barrier; + this.eventHandler = eventHandler; + } + + public Sequence getSequence() { + return sequence; + } + + public void setExceptionHandler(ExceptionHandler<? super T> exceptionHandler) { + this.exceptionHandler = exceptionHandler; + } + + public void halt() { + running = false; + } + + @Override + public void run() { + T event = null; + long nextSequence = sequence.get() + 1L; + + while (running) { + try { + // Wait for available sequence + final long availableSequence = sequenceBarrier.waitFor(nextSequence); + + // Batch process all available events + while (nextSequence <= availableSequence) { + event = ringBuffer.get(nextSequence); + eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); + nextSequence++; + } + + // Update sequence + sequence.set(availableSequence); + + } catch (final InterruptedException ex) { + Thread.currentThread().interrupt(); + LOGGER.info("Processor interrupted"); + break; + } catch (final Throwable ex) { + exceptionHandler.handleEventException(ex, nextSequence, event); + sequence.set(nextSequence); + nextSequence++; + } + } + + LOGGER.info("Processor stopped"); + } + + private static class DefaultExceptionHandler<T> implements ExceptionHandler<T> { + @Override + public void handleEventException(Throwable ex, long sequence, T event) { + LoggerFactory.getLogger(getClass()).error("Exception processing: {} {}", sequence, event, ex); + } + + @Override + public void handleOnStartException(Throwable ex) { + LoggerFactory.getLogger(getClass()).error("Exception during onStart()", ex); + } + + @Override + public void handleOnShutdownException(Throwable ex) { + LoggerFactory.getLogger(getClass()).error("Exception during onShutdown()", ex); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java new file mode 100644 index 00000000000..57c6e853f61 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Disruptor.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ThreadFactory; + +/** + * Simplified Disruptor implementation for IoTDB Pipe + * + * <p>This implementation is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor) + * and simplified for IoTDB's specific use case in the Pipe module. + * + * <p>Key simplifications: + * + * <ul> + * <li>Single event handler support (no complex dependency graphs) + * <li>Simplified lifecycle management + * <li>Removed wait strategies (using simple sleep-based waiting) + * </ul> + * + * @param <T> event type + */ +public class Disruptor<T> { + private static final Logger LOGGER = LoggerFactory.getLogger(Disruptor.class); + + private final RingBuffer<T> ringBuffer; + private final ThreadFactory threadFactory; + private BatchEventProcessor<T> processor; + private Thread processorThread; + private ExceptionHandler<? super T> exceptionHandler; + private volatile boolean started = false; + + /** + * Create a Disruptor instance + * + * @param eventFactory factory for creating pre-allocated events + * @param ringBufferSize buffer size (must be power of 2) + * @param threadFactory factory for creating consumer thread + */ + public Disruptor(EventFactory<T> eventFactory, int ringBufferSize, ThreadFactory threadFactory) { + this.ringBuffer = RingBuffer.createMultiProducer(eventFactory, ringBufferSize); + this.threadFactory = threadFactory; + } + + /** + * Configure event handler for processing events + * + * <p>Creates a batch event processor that will run in its own thread + * + * @param handler event handler implementation + * @return this instance for method chaining + */ + public Disruptor<T> handleEventsWith(final EventHandler<? super T> handler) { + SequenceBarrier barrier = ringBuffer.newBarrier(); + processor = new BatchEventProcessor<>(ringBuffer, barrier, handler); + + if (exceptionHandler != null) { + processor.setExceptionHandler(exceptionHandler); + } + + ringBuffer.addGatingSequences(processor.getSequence()); + return this; + } + + /** + * Set exception handler for error handling + * + * @param exceptionHandler handler for processing exceptions + */ + public void setDefaultExceptionHandler(ExceptionHandler<? super T> exceptionHandler) { + this.exceptionHandler = exceptionHandler; + if (processor != null) { + processor.setExceptionHandler(exceptionHandler); + } + } + + public RingBuffer<T> start() { + if (started) { + throw new IllegalStateException("Disruptor already started"); + } + + if (processor == null) { + throw new IllegalStateException("No event handler configured"); + } + + processorThread = threadFactory.newThread(processor); + processorThread.start(); + started = true; + + LOGGER.info("Disruptor started with buffer size: {}", ringBuffer.getBufferSize()); + return ringBuffer; + } + + public void shutdown() { + if (!started) { + return; + } + + if (processor != null) { + processor.halt(); + } + + if (processorThread != null) { + try { + processorThread.join(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Interrupted waiting for processor to stop"); + } + } + + started = false; + LOGGER.info("Disruptor shutdown completed"); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventFactory.java similarity index 50% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventFactory.java index 91ad0224fc5..785033c6824 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventFactory.java @@ -17,28 +17,22 @@ * under the License. */ -package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner; +package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; -import com.lmax.disruptor.ExceptionHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DisruptorQueueExceptionHandler implements ExceptionHandler<Object> { - private static final Logger LOGGER = - LoggerFactory.getLogger(DisruptorQueueExceptionHandler.class); - - @Override - public void handleEventException(final Throwable ex, final long sequence, final Object event) { - LOGGER.error("Exception processing: {} {}", sequence, event, ex); - } - - @Override - public void handleOnStartException(final Throwable ex) { - LOGGER.warn("Exception during onStart()", ex); - } - - @Override - public void handleOnShutdownException(final Throwable ex) { - LOGGER.warn("Exception during onShutdown()", ex); - } +/** + * Event factory for pre-allocating events in RingBuffer + * + * <p>This interface is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor) and + * adapted for IoTDB's Pipe module. + * + * @param <T> event type + */ +@FunctionalInterface +public interface EventFactory<T> { + /** + * Create new event instance + * + * @return new event + */ + T newInstance(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventHandler.java similarity index 50% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventHandler.java index 91ad0224fc5..1fc81a37623 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/EventHandler.java @@ -17,28 +17,25 @@ * under the License. */ -package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner; +package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; -import com.lmax.disruptor.ExceptionHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DisruptorQueueExceptionHandler implements ExceptionHandler<Object> { - private static final Logger LOGGER = - LoggerFactory.getLogger(DisruptorQueueExceptionHandler.class); - - @Override - public void handleEventException(final Throwable ex, final long sequence, final Object event) { - LOGGER.error("Exception processing: {} {}", sequence, event, ex); - } - - @Override - public void handleOnStartException(final Throwable ex) { - LOGGER.warn("Exception during onStart()", ex); - } - - @Override - public void handleOnShutdownException(final Throwable ex) { - LOGGER.warn("Exception during onShutdown()", ex); - } +/** + * Event handler for processing events from RingBuffer + * + * <p>This interface is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor) and + * adapted for IoTDB's Pipe module. + * + * @param <T> event type + */ +@FunctionalInterface +public interface EventHandler<T> { + /** + * Handle event + * + * @param event the event + * @param sequence sequence number + * @param endOfBatch whether this is the last event in current batch + * @throws Exception if processing fails + */ + void onEvent(T event, long sequence, boolean endOfBatch) throws Exception; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/ExceptionHandler.java similarity index 50% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/ExceptionHandler.java index 91ad0224fc5..28396b51ffe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/DisruptorQueueExceptionHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/ExceptionHandler.java @@ -17,28 +17,29 @@ * under the License. */ -package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner; +package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; -import com.lmax.disruptor.ExceptionHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class DisruptorQueueExceptionHandler implements ExceptionHandler<Object> { - private static final Logger LOGGER = - LoggerFactory.getLogger(DisruptorQueueExceptionHandler.class); - - @Override - public void handleEventException(final Throwable ex, final long sequence, final Object event) { - LOGGER.error("Exception processing: {} {}", sequence, event, ex); - } +/** + * Exception handler for event processing errors + * + * <p>This interface is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor) and + * adapted for IoTDB's Pipe module. + * + * @param <T> event type + */ +public interface ExceptionHandler<T> { + /** + * Handle exception during event processing + * + * @param ex exception + * @param sequence sequence number + * @param event the event + */ + void handleEventException(Throwable ex, long sequence, T event); - @Override - public void handleOnStartException(final Throwable ex) { - LOGGER.warn("Exception during onStart()", ex); - } + /** Handle exception during processor start */ + void handleOnStartException(Throwable ex); - @Override - public void handleOnShutdownException(final Throwable ex) { - LOGGER.warn("Exception during onShutdown()", ex); - } + /** Handle exception during processor shutdown */ + void handleOnShutdownException(Throwable ex); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java new file mode 100644 index 00000000000..d40ed968398 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/MultiProducerSequencer.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; + +import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.concurrent.locks.LockSupport; + +/** + * Multi-producer sequencer for coordinating concurrent publishers + * + * <p>This implementation is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor) + * and preserves the core lock-free multi-producer algorithm for IoTDB's Pipe module. + * + * <p>Key features preserved from LMAX Disruptor: + * + * <ul> + * <li>Lock-free CAS-based sequence claiming + * <li>Availability buffer for out-of-order publishing detection + * <li>Backpressure via gating sequences + * <li>Cache line padding to prevent false sharing + * </ul> + */ +public final class MultiProducerSequencer { + + /** Ring buffer size (must be power of 2) - immutable after construction */ + private final int bufferSize; + + /** + * Producer cursor tracking highest claimed sequence Updated via CAS in next() method Volatile + * reads/writes handled by Sequence class + */ + private final Sequence cursor = new Sequence(); + + /** + * Array of consumer sequences for backpressure control MUST be volatile for safe publication when + * modified by SequenceGroups Array reference is replaced atomically via + * AtomicReferenceFieldUpdater + */ + volatile Sequence[] gatingSequences; + + /** + * Cached minimum gating sequence to reduce contention Updated opportunistically in next() to + * avoid expensive array scan Does not need to be perfectly accurate (conservative is safe) + */ + private final Sequence gatingSequenceCache = new Sequence(); + + /** + * CRITICAL: Availability flags for tracking published sequences + * + * <p>Handles out-of-order publishing in multi-producer scenario: - Thread A claims seq 10, still + * writing - Thread B claims seq 11, finishes and publishes - Consumer MUST wait for seq 10 before + * reading seq 11 + * + * <p>Memory visibility guarantees: - Writers use lazySet() for store-store barrier (cheaper than + * volatile write) - Readers use get() for volatile read (ensures visibility across threads) + * + * <p>AtomicIntegerArray provides same semantics as Unsafe without reflection + */ + private final AtomicIntegerArray availableBuffer; + + /** Mask for fast modulo: sequence & indexMask == sequence % bufferSize */ + private final int indexMask; + + /** Shift for calculating wrap count: sequence >>> indexShift */ + private final int indexShift; + + public MultiProducerSequencer(int bufferSize, Sequence[] gatingSequences) { + if (bufferSize < 1) { + throw new IllegalArgumentException("bufferSize must not be less than 1"); + } + if (Integer.bitCount(bufferSize) != 1) { + throw new IllegalArgumentException("bufferSize must be a power of 2"); + } + + this.bufferSize = bufferSize; + this.gatingSequences = gatingSequences != null ? gatingSequences : new Sequence[0]; + this.availableBuffer = new AtomicIntegerArray(bufferSize); + this.indexMask = bufferSize - 1; + this.indexShift = log2(bufferSize); + + initialiseAvailableBuffer(); + } + + /** + * Claim next n sequences for publishing + * + * <p>Uses CAS loop to atomically claim sequence numbers. Implements backpressure by parking when + * buffer is full. + * + * @param n number of sequences to claim + * @return highest claimed sequence number + */ + public long next(int n) { + if (n < 1) { + throw new IllegalArgumentException("n must be > 0"); + } + + long current; + long next; + + do { + current = cursor.get(); + next = current + n; + + final long wrapPoint = next - bufferSize; + final long cachedGatingSequence = gatingSequenceCache.get(); + + if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { + long gatingSequence = Sequence.getMinimumSequence(gatingSequences, current); + + if (wrapPoint > gatingSequence) { + LockSupport.parkNanos(1); + continue; + } + + gatingSequenceCache.set(gatingSequence); + } else if (cursor.compareAndSet(current, next)) { + break; + } + } while (true); + + return next; + } + + /** Publish sequence */ + public void publish(final long sequence) { + setAvailable(sequence); + } + + /** Publish batch */ + public void publish(long lo, long hi) { + for (long l = lo; l <= hi; l++) { + setAvailable(l); + } + } + + /** + * CORE: Check if sequence is available for consumption Uses volatile read to ensure visibility of + * published sequences + */ + public boolean isAvailable(long sequence) { + int index = calculateIndex(sequence); + int flag = calculateAvailabilityFlag(sequence); + return availableBuffer.get(index) == flag; + } + + /** CORE: Get highest published - exact same algorithm */ + public long getHighestPublishedSequence(long lowerBound, long availableSequence) { + for (long sequence = lowerBound; sequence <= availableSequence; sequence++) { + if (!isAvailable(sequence)) { + return sequence - 1; + } + } + return availableSequence; + } + + public Sequence getCursor() { + return cursor; + } + + public int getBufferSize() { + return bufferSize; + } + + public long remainingCapacity() { + long consumed = Sequence.getMinimumSequence(gatingSequences, cursor.get()); + long produced = cursor.get(); + return bufferSize - (produced - consumed); + } + + /** + * Add gating sequences for consumer tracking + * + * <p>Atomically adds sequences to track consumer progress + * + * @param gatingSequences consumer sequences to add + */ + public void addGatingSequences(Sequence... gatingSequences) { + SequenceGroups.addSequences(this, this.cursor, gatingSequences); + } + + /** + * Create a sequence barrier for consumers + * + * @param sequencesToTrack upstream sequences to wait for + * @return new sequence barrier + */ + public SequenceBarrier newBarrier(Sequence... sequencesToTrack) { + return new SequenceBarrier(this, sequencesToTrack); + } + + /** Initialize available buffer */ + private void initialiseAvailableBuffer() { + for (int i = availableBuffer.length() - 1; i != 0; i--) { + setAvailableBufferValue(i, -1); + } + setAvailableBufferValue(0, -1); + } + + /** + * CORE: Mark sequence as available for consumption + * + * <p>Uses lazySet() which provides: - Store-store barrier (ensures all prior writes are visible) + * - Cheaper than full volatile write (no store-load barrier) - Sufficient for this use case + * (readers use volatile get) + */ + private void setAvailable(final long sequence) { + setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); + } + + /** + * Set availability flag with release semantics lazySet() ensures previous event writes are + * visible before flag update + */ + private void setAvailableBufferValue(int index, int flag) { + availableBuffer.lazySet(index, flag); + } + + /** Calculate availability flag */ + private int calculateAvailabilityFlag(final long sequence) { + return (int) (sequence >>> indexShift); + } + + /** Calculate index */ + private int calculateIndex(final long sequence) { + return ((int) sequence) & indexMask; + } + + /** + * Calculate log2 for index shift calculation + * + * @param i input value (must be power of 2) + * @return log2 of input + */ + private static int log2(int i) { + int r = 0; + while ((i >>= 1) != 0) { + ++r; + } + return r; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java new file mode 100644 index 00000000000..2af784b603d --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/RingBuffer.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; + +/** + * Left-hand side padding for cache line alignment + * + * <p>Prevents false sharing by ensuring RingBuffer fields don't share cache lines with preceding + * objects + */ +abstract class RingBufferPad { + protected long p1, p2, p3, p4, p5, p6, p7; +} + +/** + * Core fields for RingBuffer implementation + * + * <p>Contains the actual event storage array and sequencing state + */ +abstract class RingBufferFields<E> extends RingBufferPad { + /** Pre-allocated event storage with padding to prevent false sharing */ + private final Object[] entries; + + /** Total number of events in the buffer (must be power of 2) */ + protected final int bufferSize; + + /** Mask for fast modulo operation (bufferSize - 1) */ + protected final int indexMask; + + /** Sequencer for managing producer/consumer coordination */ + protected final MultiProducerSequencer sequencer; + + /** + * Initialize ring buffer fields + * + * @param eventFactory factory for pre-allocating events + * @param sequencer multi-producer sequencer + */ + RingBufferFields(EventFactory<E> eventFactory, MultiProducerSequencer sequencer) { + this.sequencer = sequencer; + this.bufferSize = sequencer.getBufferSize(); + + if (bufferSize < 1) { + throw new IllegalArgumentException("bufferSize must not be less than 1"); + } + if (Integer.bitCount(bufferSize) != 1) { + throw new IllegalArgumentException("bufferSize must be a power of 2"); + } + + this.indexMask = bufferSize - 1; + // Allocate array with padding on both sides to prevent false sharing + this.entries = new Object[bufferSize]; + fill(eventFactory); + } + + /** + * Pre-allocate all events in the buffer + * + * @param eventFactory factory for creating event instances + */ + private void fill(EventFactory<E> eventFactory) { + for (int i = 0; i < bufferSize; i++) { + // Store events starting after front padding + entries[i] = eventFactory.newInstance(); + } + } + + /** + * Get event at sequence using direct memory access + * + * @param sequence sequence number + * @return event at the sequence position + */ + @SuppressWarnings("unchecked") + protected final E elementAt(long sequence) { + // Use Unsafe for lock-free array access with proper memory barriers + return (E) entries[(int) (sequence & indexMask)]; + } +} + +/** + * Lock-free ring buffer for storing pre-allocated event objects + * + * <p>This implementation is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor) + * and preserves the core ring buffer algorithm for IoTDB's Pipe module. + * + * <p>Supports multi-producer concurrent access with zero-garbage design. Events are pre-allocated + * and reused, avoiding GC pressure. Uses cache line padding to prevent false sharing. + * + * @param <E> event type + */ +public final class RingBuffer<E> extends RingBufferFields<E> { + /** Initial cursor value for the ring buffer */ + public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE; + + /** + * Right-hand side padding for cache line alignment + * + * <p>Prevents false sharing by ensuring RingBuffer fields don't share cache lines with following + * objects + */ + protected long p1, p2, p3, p4, p5, p6, p7; + + /** + * Construct a RingBuffer with given factory and sequencer + * + * @param eventFactory factory to create and pre-allocate events + * @param sequencer multi-producer sequencer for sequence management + */ + private RingBuffer(EventFactory<E> eventFactory, MultiProducerSequencer sequencer) { + super(eventFactory, sequencer); + } + + /** + * Create a multi-producer RingBuffer + * + * <p>Supports concurrent publishing from multiple threads using lock-free CAS operations + * + * @param factory event factory for creating event instances + * @param bufferSize buffer size (must be power of 2) + * @param <E> event type + * @return newly created ring buffer + */ + public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize) { + MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, new Sequence[0]); + return new RingBuffer<>(factory, sequencer); + } + + /** + * Get the event at a specific sequence + * + * @param sequence sequence number to retrieve + * @return event at the given sequence + */ + public E get(long sequence) { + return elementAt(sequence); + } + + /** + * Claim the next sequence for publishing + * + * <p>Blocks if buffer is full until space becomes available + * + * @return claimed sequence number + */ + public long next() { + return sequencer.next(1); + } + + /** + * Claim next n sequences for batch publishing + * + * @param n number of sequences to claim + * @return highest claimed sequence number + */ + public long next(int n) { + return sequencer.next(n); + } + + /** + * Publish a single sequence + * + * <p>Makes the event at this sequence visible to consumers + * + * @param sequence sequence to publish + */ + public void publish(long sequence) { + sequencer.publish(sequence); + } + + /** + * Publish a batch of sequences + * + * @param lo lowest sequence in the batch (inclusive) + * @param hi highest sequence in the batch (inclusive) + */ + public void publish(long lo, long hi) { + sequencer.publish(lo, hi); + } + + /** + * Publish event using a translator function + * + * <p>Provides a higher-level API for publishing events with custom translation logic + * + * @param translator function to populate the event + * @param arg0 argument passed to translator + * @param <A> argument type + */ + public <A> void publishEvent(EventTranslator<E, A> translator, A arg0) { + final long sequence = sequencer.next(1); + translateAndPublish(translator, sequence, arg0); + } + + /** + * Translate event and publish atomically + * + * @param translator event translator function + * @param sequence claimed sequence number + * @param arg0 argument for translation + * @param <A> argument type + */ + private <A> void translateAndPublish(EventTranslator<E, A> translator, long sequence, A arg0) { + try { + translator.translateTo(get(sequence), sequence, arg0); + } finally { + sequencer.publish(sequence); + } + } + + /** + * Add gating sequences for consumer tracking + * + * <p>Gating sequences represent consumer progress and prevent overwriting unprocessed events + * + * @param gatingSequences consumer sequences to track + */ + public void addGatingSequences(Sequence... gatingSequences) { + sequencer.addGatingSequences(gatingSequences); + } + + /** + * Create a sequence barrier for consumers + * + * <p>Barrier coordinates when events become available for processing + * + * @param sequencesToTrack upstream sequences to wait for + * @return new sequence barrier + */ + public SequenceBarrier newBarrier(Sequence... sequencesToTrack) { + return sequencer.newBarrier(sequencesToTrack); + } + + /** + * Get current producer cursor position + * + * @return current cursor value + */ + public long getCursor() { + return sequencer.getCursor().get(); + } + + /** + * Get the buffer size + * + * @return configured buffer size + */ + public int getBufferSize() { + return bufferSize; + } + + /** + * Get remaining capacity in the buffer + * + * @return number of available slots + */ + public long remainingCapacity() { + return sequencer.remainingCapacity(); + } + + /** + * Function interface for translating data into events + * + * @param <E> event type + * @param <A> argument type + */ + @FunctionalInterface + public interface EventTranslator<E, A> { + /** + * Translate argument into event + * + * @param event pre-allocated event to populate + * @param sequence sequence number for this event + * @param arg source data + */ + void translateTo(E event, long sequence, A arg); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Sequence.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Sequence.java new file mode 100644 index 00000000000..1f1d3445969 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/Sequence.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; + +import java.util.concurrent.atomic.AtomicLong; + +/** Left-hand side padding for cache line alignment */ +class LhsPadding { + protected long p1, p2, p3, p4, p5, p6, p7; +} + +/** Value class holding the actual sequence */ +class Value extends LhsPadding { + protected AtomicLong value = new AtomicLong(); +} + +/** Right-hand side padding for cache line alignment */ +class RhsPadding extends Value { + protected long p9, p10, p11, p12, p13, p14, p15; +} + +/** + * Lock-free sequence counter with cache line padding + * + * <p>This implementation is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor) + * and preserves the core sequence tracking mechanism for IoTDB's Pipe module. + * + * <p>Key design features: + * + * <ul> + * <li>Three-level inheritance ensures proper field ordering for padding + * <li>Uses AtomicLong for thread-safe atomic operations + * <li>Cache line padding prevents false sharing between CPU cores + * <li>Supports both ordered writes (cheaper) and volatile writes (stronger) + * </ul> + */ +public class Sequence extends RhsPadding { + public static final long INITIAL_VALUE = -1L; + + /** Create sequence with initial value -1 */ + public Sequence() { + value.set(INITIAL_VALUE); + } + + /** Volatile read */ + public long get() { + return value.get(); + } + + /** + * Ordered write (store-store barrier only) + * + * <p>CRITICAL: Cheaper than volatile write, sufficient for most cases + */ + public void set(final long value) { + this.value.set(value); + } + + /** + * CAS operation - CORE for lock-free design + * + * @param expectedValue expected current value + * @param newValue new value + * @return true if successful + */ + public boolean compareAndSet(final long expectedValue, final long newValue) { + return value.compareAndSet(expectedValue, newValue); + } + + /** Atomically increment */ + public long incrementAndGet() { + return addAndGet(1L); + } + + /** Atomically add */ + public long addAndGet(final long increment) { + long currentValue; + long newValue; + + do { + currentValue = get(); + newValue = currentValue + increment; + } while (!compareAndSet(currentValue, newValue)); + + return newValue; + } + + @Override + public String toString() { + return Long.toString(get()); + } + + /** Get minimum sequence from array - CORE utility method */ + public static long getMinimumSequence(final Sequence[] sequences, long minimum) { + for (int i = 0, n = sequences.length; i < n; i++) { + long value = sequences[i].get(); + minimum = Math.min(minimum, value); + } + return minimum; + } + + public static long getMinimumSequence(final Sequence[] sequences) { + return getMinimumSequence(sequences, Long.MAX_VALUE); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java new file mode 100644 index 00000000000..4c8011eb1c2 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceBarrier.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; + +/** + * Sequence barrier for consumer coordination + * + * <p>This implementation is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor) + * and simplified for IoTDB's Pipe module (removed Alert mechanism - IoTDB doesn't need it). + * + * <p>Core features preserved from LMAX Disruptor: + * + * <ul> + * <li>waitFor() logic for waiting sequences + * <li>Scan available buffer for out-of-order publishing + * </ul> + */ +public class SequenceBarrier { + private final MultiProducerSequencer sequencer; + private final Sequence[] dependentSequences; + + public SequenceBarrier(MultiProducerSequencer sequencer, Sequence[] dependentSequences) { + this.sequencer = sequencer; + this.dependentSequences = dependentSequences != null ? dependentSequences : new Sequence[0]; + } + + /** + * CORE: Wait for sequence to become available (MUST keep logic) + * + * @param sequence sequence to wait for + * @return highest available sequence + * @throws InterruptedException if interrupted + */ + public long waitFor(long sequence) throws InterruptedException { + // Wait for cursor + long availableSequence; + while ((availableSequence = sequencer.getCursor().get()) < sequence) { + Thread.sleep(1); + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + } + + // Wait for dependent sequences + if (dependentSequences.length > 0) { + while (Sequence.getMinimumSequence(dependentSequences) < sequence) { + Thread.sleep(1); + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + } + } + + // CORE: Scan available buffer for highest continuously published sequence + return sequencer.getHighestPublishedSequence(sequence, availableSequence); + } + + public long getCursor() { + return sequencer.getCursor().get(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java new file mode 100644 index 00000000000..af5039070f0 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/SequenceGroups.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +/** + * Utility for atomic management of sequence arrays + * + * <p>This implementation is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor) + * and adapted for IoTDB's Pipe module. + * + * <p>Provides thread-safe operations for adding and removing sequences from gating sequence arrays + * used to track consumer progress. + */ +final class SequenceGroups { + + /** Field updater for atomic array replacement */ + private static final AtomicReferenceFieldUpdater<MultiProducerSequencer, Sequence[]> + SEQUENCE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater( + MultiProducerSequencer.class, Sequence[].class, "gatingSequences"); + + /** + * Atomically add sequences to the gating sequence array + * + * <p>Uses CAS loop to ensure thread-safe addition even under concurrent modification + * + * @param sequencer the multi-producer sequencer + * @param cursor the current cursor sequence + * @param sequencesToAdd sequences to add + */ + static void addSequences( + final MultiProducerSequencer sequencer, + final Sequence cursor, + final Sequence... sequencesToAdd) { + long cursorSequence; + Sequence[] updatedSequences; + Sequence[] currentSequences; + + do { + currentSequences = sequencer.gatingSequences; + updatedSequences = new Sequence[currentSequences.length + sequencesToAdd.length]; + System.arraycopy(currentSequences, 0, updatedSequences, 0, currentSequences.length); + + cursorSequence = cursor.get(); + + int index = currentSequences.length; + for (Sequence sequence : sequencesToAdd) { + sequence.set(cursorSequence); + updatedSequences[index++] = sequence; + } + } while (!SEQUENCE_UPDATER.compareAndSet(sequencer, currentSequences, updatedSequences)); + + cursorSequence = cursor.get(); + for (Sequence sequence : sequencesToAdd) { + sequence.set(cursorSequence); + } + } +} diff --git a/pom.xml b/pom.xml index 294a1231e35..cbe907c71b9 100644 --- a/pom.xml +++ b/pom.xml @@ -75,7 +75,6 @@ <commons-pool2.version>2.11.1</commons-pool2.version> <commons.collections4.version>4.4</commons.collections4.version> <ctest.skip.tests>false</ctest.skip.tests> - <disruptor.version>3.4.4</disruptor.version> <drill.freemarker.maven.plugin.version>1.21.1</drill.freemarker.maven.plugin.version> <dropwizard.metrics.version>4.2.19</dropwizard.metrics.version> <eclipse-collections.version>11.1.0</eclipse-collections.version> @@ -454,11 +453,6 @@ <artifactId>h2-mvstore</artifactId> <version>${h2.version}</version> </dependency> - <dependency> - <groupId>com.lmax</groupId> - <artifactId>disruptor</artifactId> - <version>${disruptor.version}</version> - </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId>
