This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6639d5742b7 Pipe: Improved hybrid algorithm and avoid the caculations
of tsfiles in connector pending queue tsfiles to block the forwarding of
realtime requests (#12668)
6639d5742b7 is described below
commit 6639d5742b796ec49fbc3d09e4527b2546e12938
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 5 20:54:02 2024 +0800
Pipe: Improved hybrid algorithm and avoid the caculations of tsfiles in
connector pending queue tsfiles to block the forwarding of realtime requests
(#12668)
---
.../event/common/heartbeat/PipeHeartbeatEvent.java | 48 ++++++++--------------
.../realtime/PipeRealtimeDataRegionExtractor.java | 3 --
.../PipeRealtimeDataRegionHybridExtractor.java | 9 +---
3 files changed, 18 insertions(+), 42 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index c3d0798c7e9..9738f86a5dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -25,8 +25,6 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
-import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
-import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
import org.apache.iotdb.db.pipe.metric.PipeHeartbeatEventMetrics;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.event.Event;
@@ -41,7 +39,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private final String dataRegionId;
private String pipeName;
- private PipeRealtimeDataRegionExtractor extractor = null;
private long timePublished;
private long timeAssigned;
@@ -62,18 +59,18 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private final boolean shouldPrintMessage;
- public PipeHeartbeatEvent(String dataRegionId, boolean shouldPrintMessage) {
+ public PipeHeartbeatEvent(final String dataRegionId, final boolean
shouldPrintMessage) {
super(null, null, null, Long.MIN_VALUE, Long.MAX_VALUE);
this.dataRegionId = dataRegionId;
this.shouldPrintMessage = shouldPrintMessage;
}
public PipeHeartbeatEvent(
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- String dataRegionId,
- long timePublished,
- boolean shouldPrintMessage) {
+ final String pipeName,
+ final PipeTaskMeta pipeTaskMeta,
+ final String dataRegionId,
+ final long timePublished,
+ final boolean shouldPrintMessage) {
super(pipeName, pipeTaskMeta, null, Long.MIN_VALUE, Long.MAX_VALUE);
this.dataRegionId = dataRegionId;
this.timePublished = timePublished;
@@ -81,12 +78,12 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
}
@Override
- public boolean internallyIncreaseResourceReferenceCount(String
holderMessage) {
+ public boolean internallyIncreaseResourceReferenceCount(final String
holderMessage) {
return true;
}
@Override
- public boolean internallyDecreaseResourceReferenceCount(String
holderMessage) {
+ public boolean internallyDecreaseResourceReferenceCount(final String
holderMessage) {
// PipeName == null indicates that the event is the raw event at disruptor,
// not the event copied and passed to the extractor
if (shouldPrintMessage && pipeName != null && LOGGER.isDebugEnabled()) {
@@ -102,11 +99,11 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
@Override
public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
- String pipeName,
- PipeTaskMeta pipeTaskMeta,
- PipePattern pattern,
- long startTime,
- long endTime) {
+ final String pipeName,
+ final PipeTaskMeta pipeTaskMeta,
+ final PipePattern pattern,
+ final long startTime,
+ final long endTime) {
// Should record PipeTaskMeta, for sometimes HeartbeatEvents should report
exceptions.
// Here we ignore parameters `pattern`, `startTime`, and `endTime`.
return new PipeHeartbeatEvent(
@@ -131,7 +128,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
/////////////////////////////// Delay Reporting
///////////////////////////////
- public void bindPipeName(String pipeName) {
+ public void bindPipeName(final String pipeName) {
if (shouldPrintMessage) {
this.pipeName = pipeName;
}
@@ -175,13 +172,13 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
/////////////////////////////// Queue size Reporting
///////////////////////////////
- public void recordDisruptorSize(RingBuffer<?> ringBuffer) {
+ public void recordDisruptorSize(final RingBuffer<?> ringBuffer) {
if (shouldPrintMessage) {
disruptorSize = ringBuffer.getBufferSize() - (int)
ringBuffer.remainingCapacity();
}
}
- public void recordExtractorQueueSize(UnboundedBlockingPendingQueue<Event>
pendingQueue) {
+ public void recordExtractorQueueSize(final
UnboundedBlockingPendingQueue<Event> pendingQueue) {
if (shouldPrintMessage) {
extractorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
extractorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
@@ -189,23 +186,12 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
}
}
- public void recordConnectorQueueSize(UnboundedBlockingPendingQueue<Event>
pendingQueue) {
+ public void recordConnectorQueueSize(final
UnboundedBlockingPendingQueue<Event> pendingQueue) {
if (shouldPrintMessage) {
connectorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
connectorQueueSize = pendingQueue.size();
}
-
- if (extractor instanceof PipeRealtimeDataRegionHybridExtractor) {
- ((PipeRealtimeDataRegionHybridExtractor) extractor)
-
.informConnectorInputPendingQueueTsFileSize(pendingQueue.getTsFileInsertionEventCount());
- }
- }
-
- /////////////////////////////// For Hybrid extractor
///////////////////////////////
-
- public void bindExtractor(PipeRealtimeDataRegionExtractor extractor) {
- this.extractor = extractor;
}
/////////////////////////////// For Commit Ordering
///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
index d809bd72a81..5415cb182e4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionExtractor.java
@@ -296,9 +296,6 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
protected abstract void doExtract(final PipeRealtimeEvent event);
protected void extractHeartbeat(final PipeRealtimeEvent event) {
- // Bind extractor so that the heartbeat event can later inform the
extractor of queue size
- ((PipeHeartbeatEvent) event.getEvent()).bindExtractor(this);
-
// Record the pending queue size before trying to put heartbeatEvent into
queue
((PipeHeartbeatEvent)
event.getEvent()).recordExtractorQueueSize(pendingQueue);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 381b8d73858..a0f075298a5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -40,15 +40,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
-import java.util.concurrent.atomic.AtomicInteger;
public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegionExtractor {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
- private final AtomicInteger connectorInputPendingQueueTsFileSize = new
AtomicInteger(0);
-
@Override
protected void doExtract(final PipeRealtimeEvent event) {
final Event eventToExtract = event.getEvent();
@@ -238,7 +235,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
private boolean isRealtimeTsFileEventCountExceededLimit() {
- return pendingQueue.getTsFileInsertionEventCount() +
connectorInputPendingQueueTsFileSize.get()
+ return pendingQueue.getTsFileInsertionEventCount()
>=
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
}
@@ -247,10 +244,6 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
>= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
}
- public void informConnectorInputPendingQueueTsFileSize(final int queueSize) {
- connectorInputPendingQueueTsFileSize.set(queueSize);
- }
-
@Override
public Event supply() {
PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent)
pendingQueue.directPoll();