This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f21870cd712 [To dev/1.3] Pipe: Avoid throwing null pointer during
Close process (#16391) (#16402)
f21870cd712 is described below
commit f21870cd712059c118ae6364138ccd96b5313350
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Sep 12 16:13:36 2025 +0800
[To dev/1.3] Pipe: Avoid throwing null pointer during Close process
(#16391) (#16402)
* Pipe: Avoid throwing null pointer during Close process
* fix
* fix
(cherry picked from commit 1bc2c7f0f615600b52e3ee20ef2f6ca44f67df0c)
---
.../pipe/sink/client/IoTDBDataNodeSyncClientManager.java | 2 ++
.../commons/pipe/sink/client/IoTDBSyncClientManager.java | 16 ++++++++--------
2 files changed, 10 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java
index 9177eb04531..2a05fe61378 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java
@@ -93,6 +93,7 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
&& endPoint != null
&& endPoint2ClientAndStatus.containsKey(endPoint)
&&
Boolean.TRUE.equals(endPoint2ClientAndStatus.get(endPoint).getRight())
+ && endPoint2ClientAndStatus.get(endPoint).getLeft() != null
? endPoint2ClientAndStatus.get(endPoint)
: getClient();
}
@@ -102,6 +103,7 @@ public class IoTDBDataNodeSyncClientManager extends
IoTDBSyncClientManager
&& endPoint != null
&& endPoint2ClientAndStatus.containsKey(endPoint)
&&
Boolean.TRUE.equals(endPoint2ClientAndStatus.get(endPoint).getRight())
+ && endPoint2ClientAndStatus.get(endPoint).getLeft() != null
? endPoint2ClientAndStatus.get(endPoint)
: getClient();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
index 789bb71ecda..7dc92b8cba2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java
@@ -119,7 +119,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
// Check whether any clients are available, if any client is available,
return directly
for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus :
endPoint2ClientAndStatus.values()) {
- if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+ if (Boolean.TRUE.equals(clientAndStatus.getRight()) &&
clientAndStatus.getLeft() != null) {
return;
}
}
@@ -128,7 +128,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
// Reconstruct all dead clients
for (final Map.Entry<TEndPoint, Pair<IoTDBSyncClient, Boolean>> entry :
endPoint2ClientAndStatus.entrySet()) {
- if (Boolean.TRUE.equals(entry.getValue().getRight())) {
+ if (Boolean.TRUE.equals(entry.getValue().getRight()) &&
entry.getValue().getLeft() != null) {
continue;
}
@@ -137,7 +137,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
// Check whether any clients are available
for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus :
endPoint2ClientAndStatus.values()) {
- if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+ if (Boolean.TRUE.equals(clientAndStatus.getRight()) &&
clientAndStatus.getLeft() != null) {
lastCheckClientStatusTimestamp = System.currentTimeMillis();
return;
}
@@ -304,7 +304,6 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
try {
if (clientAndStatus.getLeft() != null) {
clientAndStatus.getLeft().close();
- clientAndStatus.setLeft(null);
}
LOGGER.info("Client {}:{} closed.", endPoint.getIp(),
endPoint.getPort());
} catch (Exception e) {
@@ -335,7 +334,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
final int clientIndex = (int) (currentClientIndex++ % clientSize);
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
- if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+ if (Boolean.TRUE.equals(clientAndStatus.getRight()) &&
clientAndStatus.getLeft() != null) {
return clientAndStatus;
}
}
@@ -352,7 +351,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
final int clientIndex = (int) (Math.random() * clientSize);
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
- if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+ if (Boolean.TRUE.equals(clientAndStatus.getRight()) &&
clientAndStatus.getLeft() != null) {
return clientAndStatus;
}
@@ -361,7 +360,8 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
final int nextClientIndex = (clientIndex + tryCount + 1) % clientSize;
final Pair<IoTDBSyncClient, Boolean> nextClientAndStatus =
endPoint2ClientAndStatus.get(endPointList.get(nextClientIndex));
- if (Boolean.TRUE.equals(nextClientAndStatus.getRight())) {
+ if (Boolean.TRUE.equals(nextClientAndStatus.getRight())
+ && clientAndStatus.getLeft() != null) {
return nextClientAndStatus;
}
}
@@ -378,7 +378,7 @@ public abstract class IoTDBSyncClientManager extends
IoTDBClientManager implemen
for (final TEndPoint endPoint : endPointList) {
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
endPoint2ClientAndStatus.get(endPoint);
- if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
+ if (Boolean.TRUE.equals(clientAndStatus.getRight()) &&
clientAndStatus.getLeft() != null) {
return clientAndStatus;
}
}