This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 56775a3a816 Pipe: Fixed the CPU consuming problem when configNode has 
nothing to sync (#12359)
56775a3a816 is described below

commit 56775a3a816136f4b822a009306f3554c697dcb5
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 17 15:31:28 2024 +0800

    Pipe: Fixed the CPU consuming problem when configNode has nothing to sync 
(#12359)
---
 .../pipe/extractor/IoTDBConfigRegionExtractor.java      | 17 +++++++++++++----
 .../schemaregion/IoTDBSchemaRegionExtractor.java        | 16 ++++++++++++----
 .../pipe/extractor/IoTDBNonDataRegionExtractor.java     |  4 +++-
 3 files changed, 28 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
index 91d8a21b70c..30de357cc6f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.manager.pipe.extractor;
 
 import org.apache.iotdb.commons.consensus.ConfigRegionId;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.datastructure.queue.listening.AbstractPipeListeningQueue;
 import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
 import org.apache.iotdb.commons.pipe.extractor.IoTDBNonDataRegionExtractor;
@@ -46,7 +47,7 @@ public class IoTDBConfigRegionExtractor extends 
IoTDBNonDataRegionExtractor {
 
   // TODO: Delete this
   @Override
-  public void validate(PipeParameterValidator validator) throws Exception {
+  public void validate(final PipeParameterValidator validator) throws 
Exception {
     if (ConfigNodeDescriptor.getInstance()
         .getConf()
         .getConfigNodeConsensusProtocolClass()
@@ -58,7 +59,8 @@ public class IoTDBConfigRegionExtractor extends 
IoTDBNonDataRegionExtractor {
   }
 
   @Override
-  public void customize(PipeParameters parameters, 
PipeExtractorRuntimeConfiguration configuration)
+  public void customize(
+      final PipeParameters parameters, final PipeExtractorRuntimeConfiguration 
configuration)
       throws Exception {
     super.customize(parameters, configuration);
     listenedTypeSet = 
ConfigRegionListeningFilter.parseListeningPlanTypeSet(parameters);
@@ -90,13 +92,20 @@ public class IoTDBConfigRegionExtractor extends 
IoTDBNonDataRegionExtractor {
   }
 
   @Override
-  protected boolean isTypeListened(Event event) {
+  protected long getMaxBlockingTimeMs() {
+    // The connector continues to submit and relies on the queue to sleep if 
empty
+    // Here we return with block to be consistent with the dataNode connector
+    return 
PipeConfig.getInstance().getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
+  }
+
+  @Override
+  protected boolean isTypeListened(final Event event) {
     return listenedTypeSet.contains(
         ((PipeConfigRegionWritePlanEvent) 
event).getConfigPhysicalPlan().getType());
   }
 
   @Override
-  protected void confineHistoricalEventTransferTypes(PipeSnapshotEvent event) {
+  protected void confineHistoricalEventTransferTypes(final PipeSnapshotEvent 
event) {
     ((PipeConfigRegionSnapshotEvent) 
event).confineTransferredTypes(listenedTypeSet);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
index 6b6136e3f34..5c72cb74cf2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
@@ -51,7 +51,7 @@ public class IoTDBSchemaRegionExtractor extends 
IoTDBNonDataRegionExtractor {
 
   // TODO: Delete this
   @Override
-  public void validate(PipeParameterValidator validator) throws Exception {
+  public void validate(final PipeParameterValidator validator) throws 
Exception {
     if (IoTDBDescriptor.getInstance()
         .getConfig()
         .getSchemaRegionConsensusProtocolClass()
@@ -63,7 +63,8 @@ public class IoTDBSchemaRegionExtractor extends 
IoTDBNonDataRegionExtractor {
   }
 
   @Override
-  public void customize(PipeParameters parameters, 
PipeExtractorRuntimeConfiguration configuration)
+  public void customize(
+      final PipeParameters parameters, final PipeExtractorRuntimeConfiguration 
configuration)
       throws Exception {
     super.customize(parameters, configuration);
 
@@ -109,19 +110,26 @@ public class IoTDBSchemaRegionExtractor extends 
IoTDBNonDataRegionExtractor {
     return PipeAgent.runtime().isSchemaLeaderReady(schemaRegionId) ? 
super.supply() : null;
   }
 
+  @Override
+  protected long getMaxBlockingTimeMs() {
+    // The dataNode processor can sleep if it supplies null
+    // Here we return immediately to be consistent with the data region 
extractor
+    return 0;
+  }
+
   @Override
   protected AbstractPipeListeningQueue getListeningQueue() {
     return PipeAgent.runtime().schemaListener(schemaRegionId);
   }
 
   @Override
-  protected boolean isTypeListened(Event event) {
+  protected boolean isTypeListened(final Event event) {
     return listenedTypeSet.contains(
         ((PipeSchemaRegionWritePlanEvent) event).getPlanNode().getType());
   }
 
   @Override
-  protected void confineHistoricalEventTransferTypes(PipeSnapshotEvent event) {
+  protected void confineHistoricalEventTransferTypes(final PipeSnapshotEvent 
event) {
     ((PipeSchemaRegionSnapshotEvent) 
event).confineTransferredTypes(listenedTypeSet);
   }
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
index b56e79efe39..ac4758103a3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
@@ -139,7 +139,7 @@ public abstract class IoTDBNonDataRegionExtractor extends 
IoTDBExtractor {
     // Realtime
     EnrichedEvent realtimeEvent;
     do {
-      realtimeEvent = (EnrichedEvent) iterator.next(0);
+      realtimeEvent = (EnrichedEvent) iterator.next(getMaxBlockingTimeMs());
       if (Objects.isNull(realtimeEvent)) {
         return null;
       }
@@ -154,6 +154,8 @@ public abstract class IoTDBNonDataRegionExtractor extends 
IoTDBExtractor {
     return realtimeEvent;
   }
 
+  protected abstract long getMaxBlockingTimeMs();
+
   protected abstract boolean isTypeListened(Event event);
 
   protected abstract void 
confineHistoricalEventTransferTypes(PipeSnapshotEvent event);

Reply via email to