This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-pipe-ref-issue
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit cf5737fcf3d451e317dcfa4d797c32e42020c961
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Aug 5 05:09:31 2023 +0800

    refactor
---
 .../db/pipe/extractor/realtime/assigner/DisruptorQueue.java      | 9 +++++++--
 .../java/org/apache/iotdb/commons/concurrent/ThreadName.java     | 2 ++
 2 files changed, 9 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
index bb5ecf89b6b..b7823171da7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/assigner/DisruptorQueue.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.extractor.realtime.assigner;
 
+import org.apache.iotdb.commons.concurrent.IoTDBDaemonThreadFactory;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 
@@ -27,10 +28,14 @@ import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.dsl.Disruptor;
 import com.lmax.disruptor.dsl.ProducerType;
-import com.lmax.disruptor.util.DaemonThreadFactory;
+
+import static 
org.apache.iotdb.commons.concurrent.ThreadName.PIPE_EXTRACTOR_DISRUPTOR;
 
 public class DisruptorQueue {
 
+  private static final IoTDBDaemonThreadFactory THREAD_FACTORY =
+      new IoTDBDaemonThreadFactory(PIPE_EXTRACTOR_DISRUPTOR.getName());
+
   private final Disruptor<EventContainer> disruptor;
   private final RingBuffer<EventContainer> ringBuffer;
 
@@ -39,7 +44,7 @@ public class DisruptorQueue {
         new Disruptor<>(
             EventContainer::new,
             
PipeConfig.getInstance().getPipeExtractorAssignerDisruptorRingBufferSize(),
-            DaemonThreadFactory.INSTANCE, // TODO
+            THREAD_FACTORY,
             ProducerType.MULTI,
             new BlockingWaitStrategy());
     disruptor.handleEventsWith(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index aff32b22695..0382f963a42 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -122,6 +122,7 @@ public enum ThreadName {
   GPRC_DEFAULT_WORKER_ELG("grpc-default-worker-ELG"),
   GROUP_MANAGEMENT("groupManagement"),
   // -------------------------- Compute --------------------------
+  PIPE_EXTRACTOR_DISRUPTOR("Pipe-Extractor-Disruptor"),
   PIPE_ASSIGNER_EXECUTOR_POOL("Pipe-Assigner-Executor-Pool"),
   PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
   PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
@@ -257,6 +258,7 @@ public enum ThreadName {
   private static final Set<ThreadName> computeThreadNames =
       new HashSet<>(
           Arrays.asList(
+              PIPE_EXTRACTOR_DISRUPTOR,
               PIPE_ASSIGNER_EXECUTOR_POOL,
               PIPE_PROCESSOR_EXECUTOR_POOL,
               PIPE_CONNECTOR_EXECUTOR_POOL,

Reply via email to