This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/2.0.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a6547154f3b04bca19ad75324a617cbd13e9f888
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 23 18:58:38 2025 +0800

    Pipe: Optimized the degrading logger & Deleted useless UT & Copied some 
historical filter logic from dev/1.3 (#16019)
    
    * logger
    
    * fix-ut
    
    * Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
    
    (cherry picked from commit 08ab2c9dcde2748165660a3308d058f63be1b80f)
---
 ...oricalDataRegionTsFileAndDeletionExtractor.java | 22 +++------
 .../PipeRealtimeDataRegionHybridExtractor.java     | 14 ++++--
 .../PipeDataNodeRemainingEventAndTimeOperator.java |  4 ++
 .../iotdb/db/pipe/connector/PipeConnectorTest.java | 54 ----------------------
 4 files changed, 22 insertions(+), 72 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index 7b765c143a2..b9d536db936 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -51,6 +51,7 @@ import 
org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -387,21 +388,6 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
     }
   }
 
-  private void flushDataRegionAllTsFiles() {
-    final DataRegion dataRegion =
-        StorageEngine.getInstance().getDataRegion(new 
DataRegionId(dataRegionId));
-    if (Objects.isNull(dataRegion)) {
-      return;
-    }
-
-    dataRegion.writeLock("Pipe: create historical TsFile extractor");
-    try {
-      dataRegion.syncCloseAllWorkingTsFileProcessors();
-    } finally {
-      dataRegion.writeUnlock();
-    }
-  }
-
   /**
    * IoTV2 will only resend event that contains un-replicated local write 
data. So we only extract
    * ProgressIndex containing local writes for comparison to prevent 
misjudgment on whether
@@ -563,6 +549,9 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
                           // Some resource may not be closed due to the 
control of
                           // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore 
them.
                           !resource.isClosed()
+                                  && 
Optional.ofNullable(resource.getProcessor())
+                                      
.map(TsFileProcessor::alreadyMarkedClosing)
+                                      .orElse(true)
                               || mayTsFileContainUnprocessedData(resource)
                                   && 
isTsFileResourceOverlappedWithTimeRange(resource)
                                   && 
mayTsFileResourceOverlappedWithPattern(resource)))
@@ -585,6 +574,9 @@ public class 
PipeHistoricalDataRegionTsFileAndDeletionExtractor
                           // Some resource may not be closed due to the 
control of
                           // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore 
them.
                           !resource.isClosed()
+                                  && 
Optional.ofNullable(resource.getProcessor())
+                                      
.map(TsFileProcessor::alreadyMarkedClosing)
+                                      .orElse(true)
                               || mayTsFileContainUnprocessedData(resource)
                                   && 
isTsFileResourceOverlappedWithTimeRange(resource)
                                   && 
mayTsFileResourceOverlappedWithPattern(resource)))
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 8ee2f7a15a1..f1ddff90c67 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -30,6 +30,8 @@ import 
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
+import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.pipe.api.event.Event;
@@ -40,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
+import java.util.Optional;
 
 public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegionExtractor {
 
@@ -218,13 +221,18 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
     final boolean mayInsertNodeMemoryReachDangerousThreshold =
         floatingMemoryUsageInByte * pipeCount >= 
totalFloatingMemorySizeInBytes;
     if (mayInsertNodeMemoryReachDangerousThreshold && 
event.mayExtractorUseTablets(this)) {
+      final PipeDataNodeRemainingEventAndTimeOperator operator =
+          
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID);
       LOGGER.info(
-          "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory 
usage of the insert node {} has reached the dangerous threshold {}",
+          "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory 
usage of the insert node {} has reached the dangerous threshold of single pipe 
{}, event count: {}",
           pipeName,
           dataRegionId,
           event.getTsFileEpoch().getFilePath(),
-          floatingMemoryUsageInByte * pipeCount,
-          totalFloatingMemorySizeInBytes);
+          floatingMemoryUsageInByte,
+          totalFloatingMemorySizeInBytes / pipeCount,
+          Optional.ofNullable(operator)
+              
.map(PipeDataNodeRemainingEventAndTimeOperator::getInsertNodeEventCount)
+              .orElse(0));
     }
     return mayInsertNodeMemoryReachDangerousThreshold;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 0308e9b5b63..996b71cfc3d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -116,6 +116,10 @@ public class PipeDataNodeRemainingEventAndTimeOperator 
extends PipeRemainingOper
     return remainingEvents >= 0 ? remainingEvents : 0;
   }
 
+  public int getInsertNodeEventCount() {
+    return insertNodeEventCount.get();
+  }
+
   long getRemainingEvents() {
     final long remainingEvents =
         tsfileEventCount.get()
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
index f06c5de1168..5d9e444da24 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
@@ -26,7 +26,6 @@ import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionA
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -35,24 +34,6 @@ import java.util.HashMap;
 
 public class PipeConnectorTest {
 
-  @Test(expected = PipeParameterNotValidException.class)
-  public void testIoTDBLegacyPipeConnectorToSelf() throws Exception {
-    try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) {
-      connector.validate(
-          new PipeParameterValidator(
-              new PipeParameters(
-                  new HashMap<String, String>() {
-                    {
-                      put(
-                          PipeConnectorConstant.CONNECTOR_KEY,
-                          
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName());
-                      put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY, 
"127.0.0.1");
-                      put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY, 
"6667");
-                    }
-                  })));
-    }
-  }
-
   @Test
   public void testIoTDBLegacyPipeConnectorToOthers() {
     try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) {
@@ -73,24 +54,6 @@ public class PipeConnectorTest {
     }
   }
 
-  @Test(expected = PipeParameterNotValidException.class)
-  public void testIoTDBThriftSyncConnectorToSelf() throws Exception {
-    try (IoTDBDataRegionSyncConnector connector = new 
IoTDBDataRegionSyncConnector()) {
-      connector.validate(
-          new PipeParameterValidator(
-              new PipeParameters(
-                  new HashMap<String, String>() {
-                    {
-                      put(
-                          PipeConnectorConstant.CONNECTOR_KEY,
-                          
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName());
-                      put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY, 
"127.0.0.1");
-                      put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY, 
"6667");
-                    }
-                  })));
-    }
-  }
-
   @Test
   public void testIoTDBThriftSyncConnectorToOthers() {
     try (IoTDBDataRegionSyncConnector connector = new 
IoTDBDataRegionSyncConnector()) {
@@ -111,23 +74,6 @@ public class PipeConnectorTest {
     }
   }
 
-  @Test(expected = PipeParameterNotValidException.class)
-  public void testIoTDBThriftAsyncConnectorToSelf() throws Exception {
-    try (IoTDBDataRegionAsyncConnector connector = new 
IoTDBDataRegionAsyncConnector()) {
-      connector.validate(
-          new PipeParameterValidator(
-              new PipeParameters(
-                  new HashMap<String, String>() {
-                    {
-                      put(
-                          PipeConnectorConstant.CONNECTOR_KEY,
-                          
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
-                      put(PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, 
"127.0.0.1:6667");
-                    }
-                  })));
-    }
-  }
-
   @Test
   public void testIoTDBThriftAsyncConnectorToOthers() {
     try (IoTDBDataRegionAsyncConnector connector = new 
IoTDBDataRegionAsyncConnector()) {

Reply via email to