This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new cb2da102991 [To dev/1.3] Pipe: Optimized the degrading logger &
Deleted useless UT & Copied some historical filter logic from dev/1.3 (#16019)
(#16020)
cb2da102991 is described below
commit cb2da10299112b64428f64bf04919548d4c39b52
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jul 24 14:47:44 2025 +0800
[To dev/1.3] Pipe: Optimized the degrading logger & Deleted useless UT &
Copied some historical filter logic from dev/1.3 (#16019) (#16020)
* logger
* fix-ut
* Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
---
.../PipeRealtimeDataRegionHybridExtractor.java | 14 ++++--
.../PipeDataNodeRemainingEventAndTimeOperator.java | 4 ++
.../iotdb/db/pipe/connector/PipeConnectorTest.java | 54 ----------------------
3 files changed, 15 insertions(+), 57 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 51bd3819f54..d745d1a171f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -30,6 +30,8 @@ import
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
+import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
+import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
@@ -41,6 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
+import java.util.Optional;
public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegionExtractor {
@@ -219,13 +222,18 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
final boolean mayInsertNodeMemoryReachDangerousThreshold =
floatingMemoryUsageInByte * pipeCount >=
totalFloatingMemorySizeInBytes;
if (mayInsertNodeMemoryReachDangerousThreshold &&
event.mayExtractorUseTablets(this)) {
+ final PipeDataNodeRemainingEventAndTimeOperator operator =
+
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID);
LOGGER.info(
- "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory
usage of the insert node {} has reached the dangerous threshold {}",
+ "Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory
usage of the insert node {} has reached the dangerous threshold of single pipe
{}, event count: {}",
pipeName,
dataRegionId,
event.getTsFileEpoch().getFilePath(),
- floatingMemoryUsageInByte * pipeCount,
- totalFloatingMemorySizeInBytes);
+ floatingMemoryUsageInByte,
+ totalFloatingMemorySizeInBytes / pipeCount,
+ Optional.ofNullable(operator)
+
.map(PipeDataNodeRemainingEventAndTimeOperator::getInsertNodeEventCount)
+ .orElse(0));
}
return mayInsertNodeMemoryReachDangerousThreshold;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 0308e9b5b63..996b71cfc3d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -116,6 +116,10 @@ public class PipeDataNodeRemainingEventAndTimeOperator
extends PipeRemainingOper
return remainingEvents >= 0 ? remainingEvents : 0;
}
+ public int getInsertNodeEventCount() {
+ return insertNodeEventCount.get();
+ }
+
long getRemainingEvents() {
final long remainingEvents =
tsfileEventCount.get()
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
index f06c5de1168..5d9e444da24 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java
@@ -26,7 +26,6 @@ import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionA
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
-import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
import org.junit.Assert;
import org.junit.Test;
@@ -35,24 +34,6 @@ import java.util.HashMap;
public class PipeConnectorTest {
- @Test(expected = PipeParameterNotValidException.class)
- public void testIoTDBLegacyPipeConnectorToSelf() throws Exception {
- try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) {
- connector.validate(
- new PipeParameterValidator(
- new PipeParameters(
- new HashMap<String, String>() {
- {
- put(
- PipeConnectorConstant.CONNECTOR_KEY,
-
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName());
- put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY,
"127.0.0.1");
- put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY,
"6667");
- }
- })));
- }
- }
-
@Test
public void testIoTDBLegacyPipeConnectorToOthers() {
try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) {
@@ -73,24 +54,6 @@ public class PipeConnectorTest {
}
}
- @Test(expected = PipeParameterNotValidException.class)
- public void testIoTDBThriftSyncConnectorToSelf() throws Exception {
- try (IoTDBDataRegionSyncConnector connector = new
IoTDBDataRegionSyncConnector()) {
- connector.validate(
- new PipeParameterValidator(
- new PipeParameters(
- new HashMap<String, String>() {
- {
- put(
- PipeConnectorConstant.CONNECTOR_KEY,
-
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName());
- put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY,
"127.0.0.1");
- put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY,
"6667");
- }
- })));
- }
- }
-
@Test
public void testIoTDBThriftSyncConnectorToOthers() {
try (IoTDBDataRegionSyncConnector connector = new
IoTDBDataRegionSyncConnector()) {
@@ -111,23 +74,6 @@ public class PipeConnectorTest {
}
}
- @Test(expected = PipeParameterNotValidException.class)
- public void testIoTDBThriftAsyncConnectorToSelf() throws Exception {
- try (IoTDBDataRegionAsyncConnector connector = new
IoTDBDataRegionAsyncConnector()) {
- connector.validate(
- new PipeParameterValidator(
- new PipeParameters(
- new HashMap<String, String>() {
- {
- put(
- PipeConnectorConstant.CONNECTOR_KEY,
-
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
- put(PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY,
"127.0.0.1:6667");
- }
- })));
- }
- }
-
@Test
public void testIoTDBThriftAsyncConnectorToOthers() {
try (IoTDBDataRegionAsyncConnector connector = new
IoTDBDataRegionAsyncConnector()) {