This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new cb2da102991 [To dev/1.3] Pipe: Optimized the degrading logger & 
Deleted useless UT & Copied some historical filter logic from dev/1.3 (#16019) 
(#16020)
cb2da102991 is described below

commit cb2da10299112b64428f64bf04919548d4c39b52
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 24 14:47:44 2025 +0800

    [To dev/1.3] Pipe: Optimized the degrading logger & Deleted useless UT & 
Copied some historical filter logic from dev/1.3 (#16019) (#16020)
    
    * logger
    
    * fix-ut
    
    * Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
---
 .../PipeRealtimeDataRegionHybridExtractor.java     | 14 ++++--
 .../PipeDataNodeRemainingEventAndTimeOperator.java |  4 ++
 .../iotdb/db/pipe/connector/PipeConnectorTest.java | 54 ----------------------
 3 files changed, 15 insertions(+), 57 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 51bd3819f54..d745d1a171f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -30,6 +30,8 @@ import 
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
+import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
@@ -41,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Objects;
+import java.util.Optional;
 
 public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegionExtractor {
 
@@ -219,13 +222,18 @@ public class PipeRealtimeDataRegionHybridExtractor 
extends PipeRealtimeDataRegio
     final boolean mayInsertNodeMemoryReachDangerousThreshold =
         floatingMemoryUsageInByte * pipeCount >= 
totalFloatingMemorySizeInBytes;
     if (mayInsertNodeMemoryReachDangerousThreshold && 
event.mayExtractorUseTablets(this)) {
+      final PipeDataNodeRemainingEventAndTimeOperator operator =
+          
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID);
       LOGGER.info(
-          "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory 
usage of the insert node {} has reached the dangerous threshold {}",
+          "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory 
usage of the insert node {} has reached the dangerous threshold of single pipe 
{}, event count: {}",
           pipeName,
           dataRegionId,
           event.getTsFileEpoch().getFilePath(),
-          floatingMemoryUsageInByte * pipeCount,
-          totalFloatingMemorySizeInBytes);
+          floatingMemoryUsageInByte,
+          totalFloatingMemorySizeInBytes / pipeCount,
+          Optional.ofNullable(operator)
+              
.map(PipeDataNodeRemainingEventAndTimeOperator::getInsertNodeEventCount)
+              .orElse(0));
     }
     return mayInsertNodeMemoryReachDangerousThreshold;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 0308e9b5b63..996b71cfc3d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -116,6 +116,10 @@ public class PipeDataNodeRemainingEventAndTimeOperator 
extends PipeRemainingOper
     return remainingEvents >= 0 ? remainingEvents : 0;
   }
 
+  public int getInsertNodeEventCount() {
+    return insertNodeEventCount.get();
+  }
+
   long getRemainingEvents() {
     final long remainingEvents =
         tsfileEventCount.get()
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
index f06c5de1168..5d9e444da24 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
@@ -26,7 +26,6 @@ import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionA
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -35,24 +34,6 @@ import java.util.HashMap;
 
 public class PipeConnectorTest {
 
-  @Test(expected = PipeParameterNotValidException.class)
-  public void testIoTDBLegacyPipeConnectorToSelf() throws Exception {
-    try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) {
-      connector.validate(
-          new PipeParameterValidator(
-              new PipeParameters(
-                  new HashMap<String, String>() {
-                    {
-                      put(
-                          PipeConnectorConstant.CONNECTOR_KEY,
-                          
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName());
-                      put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY, 
"127.0.0.1");
-                      put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY, 
"6667");
-                    }
-                  })));
-    }
-  }
-
   @Test
   public void testIoTDBLegacyPipeConnectorToOthers() {
     try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) {
@@ -73,24 +54,6 @@ public class PipeConnectorTest {
     }
   }
 
-  @Test(expected = PipeParameterNotValidException.class)
-  public void testIoTDBThriftSyncConnectorToSelf() throws Exception {
-    try (IoTDBDataRegionSyncConnector connector = new 
IoTDBDataRegionSyncConnector()) {
-      connector.validate(
-          new PipeParameterValidator(
-              new PipeParameters(
-                  new HashMap<String, String>() {
-                    {
-                      put(
-                          PipeConnectorConstant.CONNECTOR_KEY,
-                          
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName());
-                      put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY, 
"127.0.0.1");
-                      put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY, 
"6667");
-                    }
-                  })));
-    }
-  }
-
   @Test
   public void testIoTDBThriftSyncConnectorToOthers() {
     try (IoTDBDataRegionSyncConnector connector = new 
IoTDBDataRegionSyncConnector()) {
@@ -111,23 +74,6 @@ public class PipeConnectorTest {
     }
   }
 
-  @Test(expected = PipeParameterNotValidException.class)
-  public void testIoTDBThriftAsyncConnectorToSelf() throws Exception {
-    try (IoTDBDataRegionAsyncConnector connector = new 
IoTDBDataRegionAsyncConnector()) {
-      connector.validate(
-          new PipeParameterValidator(
-              new PipeParameters(
-                  new HashMap<String, String>() {
-                    {
-                      put(
-                          PipeConnectorConstant.CONNECTOR_KEY,
-                          
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
-                      put(PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, 
"127.0.0.1:6667");
-                    }
-                  })));
-    }
-  }
-
   @Test
   public void testIoTDBThriftAsyncConnectorToOthers() {
     try (IoTDBDataRegionAsyncConnector connector = new 
IoTDBDataRegionAsyncConnector()) {

Reply via email to