This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 56775a3a816 Pipe: Fixed the CPU consuming problem when configNode has
nothing to sync (#12359)
56775a3a816 is described below
commit 56775a3a816136f4b822a009306f3554c697dcb5
Author: Caideyipi <[email protected]>
AuthorDate: Wed Apr 17 15:31:28 2024 +0800
Pipe: Fixed the CPU consuming problem when configNode has nothing to sync
(#12359)
---
.../pipe/extractor/IoTDBConfigRegionExtractor.java | 17 +++++++++++++----
.../schemaregion/IoTDBSchemaRegionExtractor.java | 16 ++++++++++++----
.../pipe/extractor/IoTDBNonDataRegionExtractor.java | 4 +++-
3 files changed, 28 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
index 91d8a21b70c..30de357cc6f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/extractor/IoTDBConfigRegionExtractor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.pipe.extractor;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.datastructure.queue.listening.AbstractPipeListeningQueue;
import org.apache.iotdb.commons.pipe.event.PipeSnapshotEvent;
import org.apache.iotdb.commons.pipe.extractor.IoTDBNonDataRegionExtractor;
@@ -46,7 +47,7 @@ public class IoTDBConfigRegionExtractor extends
IoTDBNonDataRegionExtractor {
// TODO: Delete this
@Override
- public void validate(PipeParameterValidator validator) throws Exception {
+ public void validate(final PipeParameterValidator validator) throws
Exception {
if (ConfigNodeDescriptor.getInstance()
.getConf()
.getConfigNodeConsensusProtocolClass()
@@ -58,7 +59,8 @@ public class IoTDBConfigRegionExtractor extends
IoTDBNonDataRegionExtractor {
}
@Override
- public void customize(PipeParameters parameters,
PipeExtractorRuntimeConfiguration configuration)
+ public void customize(
+ final PipeParameters parameters, final PipeExtractorRuntimeConfiguration
configuration)
throws Exception {
super.customize(parameters, configuration);
listenedTypeSet =
ConfigRegionListeningFilter.parseListeningPlanTypeSet(parameters);
@@ -90,13 +92,20 @@ public class IoTDBConfigRegionExtractor extends
IoTDBNonDataRegionExtractor {
}
@Override
- protected boolean isTypeListened(Event event) {
+ protected long getMaxBlockingTimeMs() {
+ // The connector continues to submit and relies on the queue to sleep if
empty
+ // Here we return with block to be consistent with the dataNode connector
+ return
PipeConfig.getInstance().getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
+ }
+
+ @Override
+ protected boolean isTypeListened(final Event event) {
return listenedTypeSet.contains(
((PipeConfigRegionWritePlanEvent)
event).getConfigPhysicalPlan().getType());
}
@Override
- protected void confineHistoricalEventTransferTypes(PipeSnapshotEvent event) {
+ protected void confineHistoricalEventTransferTypes(final PipeSnapshotEvent
event) {
((PipeConfigRegionSnapshotEvent)
event).confineTransferredTypes(listenedTypeSet);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
index 6b6136e3f34..5c72cb74cf2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/schemaregion/IoTDBSchemaRegionExtractor.java
@@ -51,7 +51,7 @@ public class IoTDBSchemaRegionExtractor extends
IoTDBNonDataRegionExtractor {
// TODO: Delete this
@Override
- public void validate(PipeParameterValidator validator) throws Exception {
+ public void validate(final PipeParameterValidator validator) throws
Exception {
if (IoTDBDescriptor.getInstance()
.getConfig()
.getSchemaRegionConsensusProtocolClass()
@@ -63,7 +63,8 @@ public class IoTDBSchemaRegionExtractor extends
IoTDBNonDataRegionExtractor {
}
@Override
- public void customize(PipeParameters parameters,
PipeExtractorRuntimeConfiguration configuration)
+ public void customize(
+ final PipeParameters parameters, final PipeExtractorRuntimeConfiguration
configuration)
throws Exception {
super.customize(parameters, configuration);
@@ -109,19 +110,26 @@ public class IoTDBSchemaRegionExtractor extends
IoTDBNonDataRegionExtractor {
return PipeAgent.runtime().isSchemaLeaderReady(schemaRegionId) ?
super.supply() : null;
}
+ @Override
+ protected long getMaxBlockingTimeMs() {
+ // The dataNode processor can sleep if it supplies null
+ // Here we return immediately to be consistent with the data region
extractor
+ return 0;
+ }
+
@Override
protected AbstractPipeListeningQueue getListeningQueue() {
return PipeAgent.runtime().schemaListener(schemaRegionId);
}
@Override
- protected boolean isTypeListened(Event event) {
+ protected boolean isTypeListened(final Event event) {
return listenedTypeSet.contains(
((PipeSchemaRegionWritePlanEvent) event).getPlanNode().getType());
}
@Override
- protected void confineHistoricalEventTransferTypes(PipeSnapshotEvent event) {
+ protected void confineHistoricalEventTransferTypes(final PipeSnapshotEvent
event) {
((PipeSchemaRegionSnapshotEvent)
event).confineTransferredTypes(listenedTypeSet);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
index b56e79efe39..ac4758103a3 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/extractor/IoTDBNonDataRegionExtractor.java
@@ -139,7 +139,7 @@ public abstract class IoTDBNonDataRegionExtractor extends
IoTDBExtractor {
// Realtime
EnrichedEvent realtimeEvent;
do {
- realtimeEvent = (EnrichedEvent) iterator.next(0);
+ realtimeEvent = (EnrichedEvent) iterator.next(getMaxBlockingTimeMs());
if (Objects.isNull(realtimeEvent)) {
return null;
}
@@ -154,6 +154,8 @@ public abstract class IoTDBNonDataRegionExtractor extends
IoTDBExtractor {
return realtimeEvent;
}
+ protected abstract long getMaxBlockingTimeMs();
+
protected abstract boolean isTypeListened(Event event);
protected abstract void
confineHistoricalEventTransferTypes(PipeSnapshotEvent event);