This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch fix-pipe-ref-issue in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cf5737fcf3d451e317dcfa4d797c32e42020c961 Author: Steve Yurong Su <[email protected]> AuthorDate: Sat Aug 5 05:09:31 2023 +0800 refactor --- .../db/pipe/extractor/realtime/assigner/DisruptorQueue.java | 9 +++++++-- .../java/org/apache/iotdb/commons/concurrent/ThreadName.java | 2 ++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java index bb5ecf89b6b..b7823171da7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.extractor.realtime.assigner; +import org.apache.iotdb.commons.concurrent.IoTDBDaemonThreadFactory; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; @@ -27,10 +28,14 @@ import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; -import com.lmax.disruptor.util.DaemonThreadFactory; + +import static org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR; public class DisruptorQueue { + private static final IoTDBDaemonThreadFactory THREAD_FACTORY = + new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName()); + private final Disruptor<EventContainer> disruptor; private final RingBuffer<EventContainer> ringBuffer; @@ -39,7 +44,7 @@ public class DisruptorQueue { new Disruptor<>( EventContainer::new, PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize(), - DaemonThreadFactory.INSTANCE, // TODO + THREAD_FACTORY, ProducerType.MULTI, new BlockingWaitStrategy()); disruptor.handleEventsWith( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index aff32b22695..0382f963a42 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -122,6 +122,7 @@ public enum ThreadName { GPRC_DEFAULT_WORKER_ELG("grpc-default-worker-ELG"), GROUP_MANAGEMENT("groupManagement"), // -------------------------- Compute -------------------------- + PIPE_EXTRACTOR_DISRUPTOR("Pipe-Extractor-Disruptor"), PIPE_ASSIGNER_EXECUTOR_POOL("Pipe-Assigner-Executor-Pool"), PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"), PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"), @@ -257,6 +258,7 @@ public enum ThreadName { private static final Set<ThreadName> computeThreadNames = new HashSet<>( Arrays.asList( + PIPE_EXTRACTOR_DISRUPTOR, PIPE_ASSIGNER_EXECUTOR_POOL, PIPE_PROCESSOR_EXECUTOR_POOL, PIPE_CONNECTOR_EXECUTOR_POOL,
