This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5c434dc6a31 Pipe: Changed the hybrid switching status to avoid first
data is not synced realtime in hybrid mode (#12495)
5c434dc6a31 is described below
commit 5c434dc6a31e3a39d027544429230c2ff2f50d98
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 10 10:16:49 2024 +0800
Pipe: Changed the hybrid switching status to avoid first data is not synced
realtime in hybrid mode (#12495)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../PipeRealtimeDataRegionHybridExtractor.java | 67 ++++++++++++----------
.../apache/iotdb/commons/conf/CommonConfig.java | 11 ++++
.../iotdb/commons/conf/CommonDescriptor.java | 5 ++
.../iotdb/commons/pipe/config/PipeConfig.java | 7 +++
4 files changed, 60 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 2493bac78e5..2161cefc87c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -27,7 +27,9 @@ import
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
+import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.pipe.api.event.Event;
@@ -37,6 +39,7 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegionExtractor {
@@ -44,12 +47,11 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
- private volatile boolean isStartedToSupply = false;
private final AtomicInteger processorEventCollectorQueueTsFileSize = new
AtomicInteger(0);
private final AtomicInteger connectorInputPendingQueueTsFileSize = new
AtomicInteger(0);
@Override
- protected void doExtract(PipeRealtimeEvent event) {
+ protected void doExtract(final PipeRealtimeEvent event) {
final Event eventToExtract = event.getEvent();
if (eventToExtract instanceof TabletInsertionEvent) {
@@ -78,7 +80,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
return shouldExtractInsertion;
}
- private void extractTabletInsertion(PipeRealtimeEvent event) {
+ private void extractTabletInsertion(final PipeRealtimeEvent event) {
if (canNotUseTabletAnyMore()) {
event
.getTsFileEpoch()
@@ -107,7 +109,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
case USING_TABLET:
case USING_BOTH:
if (!pendingQueue.waitedOffer(event)) {
- // this would not happen, but just in case.
+ // This would not happen, but just in case.
// pendingQueue is unbounded, so it should never reach capacity.
final String errorMessage =
String.format(
@@ -131,7 +133,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
}
- private void extractTsFileInsertion(PipeRealtimeEvent event) {
+ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
event
.getTsFileEpoch()
.migrateState(
@@ -178,7 +180,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
case USING_TSFILE:
case USING_BOTH:
if (!pendingQueue.waitedOffer(event)) {
- // this would not happen, but just in case.
+ // This would not happen, but just in case.
// pendingQueue is unbounded, so it should never reach capacity.
final String errorMessage =
String.format(
@@ -203,19 +205,18 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
}
private boolean canNotUseTabletAnyMore() {
- // In the following 4 cases, we should not extract any more tablet events.
all the data
+ // In the following 5 cases, we should not extract any more tablet events.
all the data
// represented by the tablet events should be carried by the following
tsfile event:
- // 1. The historical extractor has not consumed all the data.
- // 2. HybridExtractor will first try to do extraction in log mode, and
then choose log or
- // tsfile mode to continue extracting, but if Wal size > maximum size of
wal buffer,
+ // 1. If Wal size > maximum size of wal buffer,
// the write operation will be throttled, so we should not extract any
more tablet events.
- // 3. The number of pinned memtables has reached the dangerous threshold.
- // 4. The number of tsfile events in the pending queue has exceeded the
limit.
+ // 2. The number of pinned memtables has reached the dangerous threshold.
+ // 3. The number of historical tsFile events to transfer has exceeded the
limit.
+ // 4. The number of realtime tsfile events to transfer has exceeded the
limit.
// 5. The number of linked tsfiles has reached the dangerous threshold.
- return !isStartedToSupply
- || mayWalSizeReachThrottleThreshold()
+ return mayWalSizeReachThrottleThreshold()
|| mayMemTablePinnedCountReachDangerousThreshold()
- || isTsFileEventCountInQueueExceededLimit()
+ || isHistoricalTsFileEventCountExceededLimit()
+ || isRealtimeTsFileEventCountExceededLimit()
|| mayTsFileLinkedCountReachDangerousThreshold();
}
@@ -229,7 +230,15 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
>= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
}
- private boolean isTsFileEventCountInQueueExceededLimit() {
+ private boolean isHistoricalTsFileEventCountExceededLimit() {
+ final IoTDBDataRegionExtractor extractor =
+ PipeExtractorMetrics.getInstance().getExtractorMap().get(getTaskID());
+ return Objects.nonNull(extractor)
+ && extractor.getHistoricalTsFileInsertionEventCount()
+ >=
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
+ }
+
+ private boolean isRealtimeTsFileEventCountExceededLimit() {
return pendingQueue.getTsFileInsertionEventCount()
+ processorEventCollectorQueueTsFileSize.get()
+ connectorInputPendingQueueTsFileSize.get()
@@ -241,24 +250,22 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
>= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
}
- public void informProcessorEventCollectorQueueTsFileSize(int queueSize) {
+ public void informProcessorEventCollectorQueueTsFileSize(final int
queueSize) {
processorEventCollectorQueueTsFileSize.set(queueSize);
}
- public void informConnectorInputPendingQueueTsFileSize(int queueSize) {
+ public void informConnectorInputPendingQueueTsFileSize(final int queueSize) {
connectorInputPendingQueueTsFileSize.set(queueSize);
}
@Override
public Event supply() {
- isStartedToSupply = true;
-
PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent)
pendingQueue.directPoll();
while (realtimeEvent != null) {
- Event suppliedEvent;
+ final Event suppliedEvent;
- // used to judge type of event, not directly for supplying.
+ // Used to judge the type of the event, not directly for supplying.
final Event eventToSupply = realtimeEvent.getEvent();
if (eventToSupply instanceof TabletInsertionEvent) {
suppliedEvent = supplyTabletInsertion(realtimeEvent);
@@ -285,11 +292,11 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
realtimeEvent = (PipeRealtimeEvent) pendingQueue.directPoll();
}
- // means the pending queue is empty.
+ // Means the pending queue is empty.
return null;
}
- private Event supplyTabletInsertion(PipeRealtimeEvent event) {
+ private Event supplyTabletInsertion(final PipeRealtimeEvent event) {
event
.getTsFileEpoch()
.migrateState(
@@ -307,7 +314,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
switch (state) {
case USING_TSFILE:
- // if the state is USING_TSFILE, discard the event and poll the next
one.
+ // If the state is USING_TSFILE, discard the event and poll the next
one.
return null;
case EMPTY:
case USING_TABLET:
@@ -316,7 +323,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
if
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
{
return event.getEvent();
} else {
- // if the event's reference count can not be increased, it means the
data represented by
+ // If the event's reference count can not be increased, it means the
data represented by
// this event is not reliable anymore. but the data represented by
this event
// has been carried by the following tsfile event, so we can just
discard this event.
event.getTsFileEpoch().migrateState(this, s ->
TsFileEpoch.State.USING_BOTH);
@@ -329,13 +336,13 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
}
}
- private Event supplyTsFileInsertion(PipeRealtimeEvent event) {
+ private Event supplyTsFileInsertion(final PipeRealtimeEvent event) {
event
.getTsFileEpoch()
.migrateState(
this,
state -> {
- // this would not happen, but just in case.
+ // This would not happen, but just in case.
if (state.equals(TsFileEpoch.State.EMPTY)) {
LOGGER.error(
String.format("EMPTY TsFileEpoch when supplying TsFile
Event %s", event));
@@ -347,7 +354,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
switch (state) {
case USING_TABLET:
- // if the state is USING_TABLET, discard the event and poll the next
one.
+ // If the state is USING_TABLET, discard the event and poll the next
one.
return null;
case EMPTY:
case USING_TSFILE:
@@ -356,7 +363,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
if
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
{
return event.getEvent();
} else {
- // if the event's reference count can not be increased, it means the
data represented by
+ // If the event's reference count can not be increased, it means the
data represented by
// this event is not reliable anymore. the data has been lost. we
simply discard this
// event
// and report the exception to PipeRuntimeAgent.
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 493ea0651ac..86014cd7b6c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -207,6 +207,7 @@ public class CommonConfig {
private boolean pipeAirGapReceiverEnabled = false;
private int pipeAirGapReceiverPort = 9780;
+ private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2;
private int pipeMaxAllowedPinnedMemTableCount = 50;
private long pipeMaxAllowedLinkedTsFileCount = 100;
@@ -830,6 +831,16 @@ public class CommonConfig {
return pipeAirGapReceiverPort;
}
+ public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
+ return pipeMaxAllowedHistoricalTsFilePerDataRegion;
+ }
+
+ public void setPipeMaxAllowedHistoricalTsFilePerDataRegion(
+ int pipeMaxAllowedPendingTsFileEpochPerDataRegion) {
+ this.pipeMaxAllowedHistoricalTsFilePerDataRegion =
+ pipeMaxAllowedPendingTsFileEpochPerDataRegion;
+ }
+
public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() {
return pipeMaxAllowedPendingTsFileEpochPerDataRegion;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 2bcc3400135..7c9f9c813ee 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -425,6 +425,11 @@ public class CommonDescriptor {
"pipe_air_gap_receiver_port",
Integer.toString(config.getPipeAirGapReceiverPort()))));
+ config.setPipeMaxAllowedHistoricalTsFilePerDataRegion(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_max_allowed_historical_tsfile_per_data_region",
+
String.valueOf(config.getPipeMaxAllowedHistoricalTsFilePerDataRegion()))));
config.setPipeMaxAllowedPendingTsFileEpochPerDataRegion(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index dbe4b6a9c9f..7303659d51b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -177,6 +177,10 @@ public class PipeConfig {
/////////////////////////////// Hybrid Mode ///////////////////////////////
+ public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
+ return COMMON_CONFIG.getPipeMaxAllowedHistoricalTsFilePerDataRegion();
+ }
+
public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() {
return COMMON_CONFIG.getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
}
@@ -329,6 +333,9 @@ public class PipeConfig {
LOGGER.info("PipeAirGapReceiverEnabled: {}",
getPipeAirGapReceiverEnabled());
LOGGER.info("PipeAirGapReceiverPort: {}", getPipeAirGapReceiverPort());
+ LOGGER.info(
+ "PipeMaxAllowedHistoricalTsFilePerDataRegion: {}",
+ getPipeMaxAllowedHistoricalTsFilePerDataRegion());
LOGGER.info(
"PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}",
getPipeMaxAllowedPendingTsFileEpochPerDataRegion());