This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 4687f7d6591 Pipe: Fixed the NPE caused by the compression timer
(#15782) (#15783)
4687f7d6591 is described below
commit 4687f7d6591ac8993f939c529e233b7286823862
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 19 12:00:17 2025 +0800
Pipe: Fixed the NPE caused by the compression timer (#15782) (#15783)
---
.../manager/pipe/agent/task/PipeConfigNodeSubtask.java | 9 ++++++---
.../protocol/airgap/IoTDBDataRegionAirGapConnector.java | 2 +-
.../protocol/thrift/async/IoTDBDataRegionAsyncConnector.java | 2 +-
.../protocol/thrift/sync/IoTDBDataRegionSyncConnector.java | 2 +-
4 files changed, 9 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index 381769d1df3..263dab213e6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -26,8 +26,9 @@ import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractConnectorSubtask;
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
-import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
+import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
@@ -117,7 +118,8 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
final PipeTaskRuntimeConfiguration runtimeConfiguration =
new PipeTaskRuntimeConfiguration(
- new PipeTaskRuntimeEnvironment(pipeName, creationTime,
CONFIG_REGION_ID.getId()));
+ new PipeTaskProcessorRuntimeEnvironment(
+ pipeName, creationTime, CONFIG_REGION_ID.getId(), null));
processor =
PipeConfigNodeAgent.plugin()
@@ -142,7 +144,8 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
// 3. Customize connector
final PipeTaskRuntimeConfiguration runtimeConfiguration =
new PipeTaskRuntimeConfiguration(
- new PipeTaskRuntimeEnvironment(pipeName, creationTime,
CONFIG_REGION_ID.getId()));
+ new PipeTaskConnectorRuntimeEnvironment(
+ pipeName, creationTime, CONFIG_REGION_ID.getId()));
outputPipeConnector.customize(connectorParameters, runtimeConfiguration);
// 4. Handshake
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index cc3e330656e..ee35ab46306 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -308,7 +308,7 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
@Override
protected byte[] compressIfNeeded(final byte[] reqInBytes) throws
IOException {
- if (Objects.isNull(compressionTimer)) {
+ if (Objects.isNull(compressionTimer) &&
Objects.nonNull(attributeSortedString)) {
compressionTimer =
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 46f3cfb71d1..fd80f47c45f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -464,7 +464,7 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
@Override
public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws
IOException {
- if (Objects.isNull(compressionTimer)) {
+ if (Objects.isNull(compressionTimer) &&
Objects.nonNull(attributeSortedString)) {
compressionTimer =
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index b8e5795484e..784ee14a55a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -497,7 +497,7 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
@Override
public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws
IOException {
- if (Objects.isNull(compressionTimer)) {
+ if (Objects.isNull(compressionTimer) &&
Objects.nonNull(attributeSortedString)) {
compressionTimer =
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
}