This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c434dc6a31 Pipe: Changed the hybrid switching status to avoid first 
data is not synced realtime in hybrid mode (#12495)
5c434dc6a31 is described below

commit 5c434dc6a31e3a39d027544429230c2ff2f50d98
Author: Caideyipi <[email protected]>
AuthorDate: Fri May 10 10:16:49 2024 +0800

    Pipe: Changed the hybrid switching status to avoid first data is not synced 
realtime in hybrid mode (#12495)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../PipeRealtimeDataRegionHybridExtractor.java     | 67 ++++++++++++----------
 .../apache/iotdb/commons/conf/CommonConfig.java    | 11 ++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  5 ++
 .../iotdb/commons/pipe/config/PipeConfig.java      |  7 +++
 4 files changed, 60 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 2493bac78e5..2161cefc87c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -27,7 +27,9 @@ import 
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
+import org.apache.iotdb.db.pipe.metric.PipeExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -37,6 +39,7 @@ import 
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegionExtractor {
@@ -44,12 +47,11 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
 
-  private volatile boolean isStartedToSupply = false;
   private final AtomicInteger processorEventCollectorQueueTsFileSize = new 
AtomicInteger(0);
   private final AtomicInteger connectorInputPendingQueueTsFileSize = new 
AtomicInteger(0);
 
   @Override
-  protected void doExtract(PipeRealtimeEvent event) {
+  protected void doExtract(final PipeRealtimeEvent event) {
     final Event eventToExtract = event.getEvent();
 
     if (eventToExtract instanceof TabletInsertionEvent) {
@@ -78,7 +80,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     return shouldExtractInsertion;
   }
 
-  private void extractTabletInsertion(PipeRealtimeEvent event) {
+  private void extractTabletInsertion(final PipeRealtimeEvent event) {
     if (canNotUseTabletAnyMore()) {
       event
           .getTsFileEpoch()
@@ -107,7 +109,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
       case USING_TABLET:
       case USING_BOTH:
         if (!pendingQueue.waitedOffer(event)) {
-          // this would not happen, but just in case.
+          // This would not happen, but just in case.
           // pendingQueue is unbounded, so it should never reach capacity.
           final String errorMessage =
               String.format(
@@ -131,7 +133,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     }
   }
 
-  private void extractTsFileInsertion(PipeRealtimeEvent event) {
+  private void extractTsFileInsertion(final PipeRealtimeEvent event) {
     event
         .getTsFileEpoch()
         .migrateState(
@@ -178,7 +180,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
       case USING_TSFILE:
       case USING_BOTH:
         if (!pendingQueue.waitedOffer(event)) {
-          // this would not happen, but just in case.
+          // This would not happen, but just in case.
           // pendingQueue is unbounded, so it should never reach capacity.
           final String errorMessage =
               String.format(
@@ -203,19 +205,18 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
   }
 
   private boolean canNotUseTabletAnyMore() {
-    // In the following 4 cases, we should not extract any more tablet events. 
all the data
+    // In the following 5 cases, we should not extract any more tablet events. 
all the data
     // represented by the tablet events should be carried by the following 
tsfile event:
-    //  1. The historical extractor has not consumed all the data.
-    //  2. HybridExtractor will first try to do extraction in log mode, and 
then choose log or
-    //  tsfile mode to continue extracting, but if Wal size > maximum size of 
wal buffer,
+    //  1. If Wal size > maximum size of wal buffer,
     //  the write operation will be throttled, so we should not extract any 
more tablet events.
-    //  3. The number of pinned memtables has reached the dangerous threshold.
-    //  4. The number of tsfile events in the pending queue has exceeded the 
limit.
+    //  2. The number of pinned memtables has reached the dangerous threshold.
+    //  3. The number of historical tsFile events to transfer has exceeded the 
limit.
+    //  4. The number of realtime tsfile events to transfer has exceeded the 
limit.
     //  5. The number of linked tsfiles has reached the dangerous threshold.
-    return !isStartedToSupply
-        || mayWalSizeReachThrottleThreshold()
+    return mayWalSizeReachThrottleThreshold()
         || mayMemTablePinnedCountReachDangerousThreshold()
-        || isTsFileEventCountInQueueExceededLimit()
+        || isHistoricalTsFileEventCountExceededLimit()
+        || isRealtimeTsFileEventCountExceededLimit()
         || mayTsFileLinkedCountReachDangerousThreshold();
   }
 
@@ -229,7 +230,15 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
         >= PipeConfig.getInstance().getPipeMaxAllowedPinnedMemTableCount();
   }
 
-  private boolean isTsFileEventCountInQueueExceededLimit() {
+  private boolean isHistoricalTsFileEventCountExceededLimit() {
+    final IoTDBDataRegionExtractor extractor =
+        PipeExtractorMetrics.getInstance().getExtractorMap().get(getTaskID());
+    return Objects.nonNull(extractor)
+        && extractor.getHistoricalTsFileInsertionEventCount()
+            >= 
PipeConfig.getInstance().getPipeMaxAllowedHistoricalTsFilePerDataRegion();
+  }
+
+  private boolean isRealtimeTsFileEventCountExceededLimit() {
     return pendingQueue.getTsFileInsertionEventCount()
             + processorEventCollectorQueueTsFileSize.get()
             + connectorInputPendingQueueTsFileSize.get()
@@ -241,24 +250,22 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
         >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
   }
 
-  public void informProcessorEventCollectorQueueTsFileSize(int queueSize) {
+  public void informProcessorEventCollectorQueueTsFileSize(final int 
queueSize) {
     processorEventCollectorQueueTsFileSize.set(queueSize);
   }
 
-  public void informConnectorInputPendingQueueTsFileSize(int queueSize) {
+  public void informConnectorInputPendingQueueTsFileSize(final int queueSize) {
     connectorInputPendingQueueTsFileSize.set(queueSize);
   }
 
   @Override
   public Event supply() {
-    isStartedToSupply = true;
-
     PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) 
pendingQueue.directPoll();
 
     while (realtimeEvent != null) {
-      Event suppliedEvent;
+      final Event suppliedEvent;
 
-      // used to judge type of event, not directly for supplying.
+      // Used to judge the type of the event, not directly for supplying.
       final Event eventToSupply = realtimeEvent.getEvent();
       if (eventToSupply instanceof TabletInsertionEvent) {
         suppliedEvent = supplyTabletInsertion(realtimeEvent);
@@ -285,11 +292,11 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
       realtimeEvent = (PipeRealtimeEvent) pendingQueue.directPoll();
     }
 
-    // means the pending queue is empty.
+    // Means the pending queue is empty.
     return null;
   }
 
-  private Event supplyTabletInsertion(PipeRealtimeEvent event) {
+  private Event supplyTabletInsertion(final PipeRealtimeEvent event) {
     event
         .getTsFileEpoch()
         .migrateState(
@@ -307,7 +314,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
     switch (state) {
       case USING_TSFILE:
-        // if the state is USING_TSFILE, discard the event and poll the next 
one.
+        // If the state is USING_TSFILE, discard the event and poll the next 
one.
         return null;
       case EMPTY:
       case USING_TABLET:
@@ -316,7 +323,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
         if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
 {
           return event.getEvent();
         } else {
-          // if the event's reference count can not be increased, it means the 
data represented by
+          // If the event's reference count can not be increased, it means the 
data represented by
           // this event is not reliable anymore. but the data represented by 
this event
           // has been carried by the following tsfile event, so we can just 
discard this event.
           event.getTsFileEpoch().migrateState(this, s -> 
TsFileEpoch.State.USING_BOTH);
@@ -329,13 +336,13 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
     }
   }
 
-  private Event supplyTsFileInsertion(PipeRealtimeEvent event) {
+  private Event supplyTsFileInsertion(final PipeRealtimeEvent event) {
     event
         .getTsFileEpoch()
         .migrateState(
             this,
             state -> {
-              // this would not happen, but just in case.
+              // This would not happen, but just in case.
               if (state.equals(TsFileEpoch.State.EMPTY)) {
                 LOGGER.error(
                     String.format("EMPTY TsFileEpoch when supplying TsFile 
Event %s", event));
@@ -347,7 +354,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     final TsFileEpoch.State state = event.getTsFileEpoch().getState(this);
     switch (state) {
       case USING_TABLET:
-        // if the state is USING_TABLET, discard the event and poll the next 
one.
+        // If the state is USING_TABLET, discard the event and poll the next 
one.
         return null;
       case EMPTY:
       case USING_TSFILE:
@@ -356,7 +363,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
         if 
(event.increaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()))
 {
           return event.getEvent();
         } else {
-          // if the event's reference count can not be increased, it means the 
data represented by
+          // If the event's reference count can not be increased, it means the 
data represented by
           // this event is not reliable anymore. the data has been lost. we 
simply discard this
           // event
           // and report the exception to PipeRuntimeAgent.
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 493ea0651ac..86014cd7b6c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -207,6 +207,7 @@ public class CommonConfig {
   private boolean pipeAirGapReceiverEnabled = false;
   private int pipeAirGapReceiverPort = 9780;
 
+  private int pipeMaxAllowedHistoricalTsFilePerDataRegion = 100;
   private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 2;
   private int pipeMaxAllowedPinnedMemTableCount = 50;
   private long pipeMaxAllowedLinkedTsFileCount = 100;
@@ -830,6 +831,16 @@ public class CommonConfig {
     return pipeAirGapReceiverPort;
   }
 
+  public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
+    return pipeMaxAllowedHistoricalTsFilePerDataRegion;
+  }
+
+  public void setPipeMaxAllowedHistoricalTsFilePerDataRegion(
+      int pipeMaxAllowedPendingTsFileEpochPerDataRegion) {
+    this.pipeMaxAllowedHistoricalTsFilePerDataRegion =
+        pipeMaxAllowedPendingTsFileEpochPerDataRegion;
+  }
+
   public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() {
     return pipeMaxAllowedPendingTsFileEpochPerDataRegion;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 2bcc3400135..7c9f9c813ee 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -425,6 +425,11 @@ public class CommonDescriptor {
                 "pipe_air_gap_receiver_port",
                 Integer.toString(config.getPipeAirGapReceiverPort()))));
 
+    config.setPipeMaxAllowedHistoricalTsFilePerDataRegion(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_max_allowed_historical_tsfile_per_data_region",
+                
String.valueOf(config.getPipeMaxAllowedHistoricalTsFilePerDataRegion()))));
     config.setPipeMaxAllowedPendingTsFileEpochPerDataRegion(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index dbe4b6a9c9f..7303659d51b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -177,6 +177,10 @@ public class PipeConfig {
 
   /////////////////////////////// Hybrid Mode ///////////////////////////////
 
+  public int getPipeMaxAllowedHistoricalTsFilePerDataRegion() {
+    return COMMON_CONFIG.getPipeMaxAllowedHistoricalTsFilePerDataRegion();
+  }
+
   public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() {
     return COMMON_CONFIG.getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
   }
@@ -329,6 +333,9 @@ public class PipeConfig {
     LOGGER.info("PipeAirGapReceiverEnabled: {}", 
getPipeAirGapReceiverEnabled());
     LOGGER.info("PipeAirGapReceiverPort: {}", getPipeAirGapReceiverPort());
 
+    LOGGER.info(
+        "PipeMaxAllowedHistoricalTsFilePerDataRegion: {}",
+        getPipeMaxAllowedHistoricalTsFilePerDataRegion());
     LOGGER.info(
         "PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}",
         getPipeMaxAllowedPendingTsFileEpochPerDataRegion());

Reply via email to