This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push:
new 7ad59b6da69 Use getOrCreateDataPartition when executing
FetchDeviceLeader
7ad59b6da69 is described below
commit 7ad59b6da6981fc890992d6ba071cf31442046d6
Author: Yongzao <[email protected]>
AuthorDate: Mon Jul 14 11:35:45 2025 +0800
Use getOrCreateDataPartition when executing FetchDeviceLeader
---
.../apache/iotdb/TableModelSessionPoolExample.java | 10 ++++--
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 40 ++++++++++++++--------
2 files changed, 32 insertions(+), 18 deletions(-)
diff --git
a/example/session/src/main/java/org/apache/iotdb/TableModelSessionPoolExample.java
b/example/session/src/main/java/org/apache/iotdb/TableModelSessionPoolExample.java
index 03e2a14c046..0109cacc2c0 100644
---
a/example/session/src/main/java/org/apache/iotdb/TableModelSessionPoolExample.java
+++
b/example/session/src/main/java/org/apache/iotdb/TableModelSessionPoolExample.java
@@ -139,9 +139,13 @@ public class TableModelSessionPoolExample {
String correctURL =
session.getDeviceLeaderURL("test2", Arrays.asList("test1", "1",
"3"), isSetTag, 66);
System.out.println("Correct device leader URL: " + correctURL);
- String errorDbURL =
- session.getDeviceLeaderURL("test3", Arrays.asList("test1", "1",
"3"), isSetTag, 66);
- System.out.println("Error dbName device leader URL: " + errorDbURL);
+ try {
+ String errorDbURL =
+ session.getDeviceLeaderURL("test3", Arrays.asList("test1",
"1", "3"), isSetTag, 66);
+ System.out.println("Error dbName device leader URL: " + errorDbURL);
+ } catch (StatementExecutionException e) {
+ System.out.println(e.getMessage());
+ }
String errorDeviceURL =
session.getDeviceLeaderURL("test2", Arrays.asList("test1", "3",
"1"), isSetTag, 66);
System.out.println("Error deviceId device leader URL: " +
errorDeviceURL);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 7e05bc2f212..b1ecfda15e3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.db.audit.AuditLogger;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
@@ -3101,25 +3102,34 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
for (int i = 0; i < req.getIsSetTagSize(); i++) {
trueSegments[i] = req.getIsSetTag().get(i) ?
req.getDeviceId().get(start_index++) : null;
}
+ TTableDeviceLeaderResp resp = new TTableDeviceLeaderResp();
+ resp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
IDeviceID deviceID = Factory.DEFAULT_FACTORY.create(trueSegments);
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartitionSlot(req.getTime());
DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(deviceID,
Collections.singletonList(timePartitionSlot));
- DataPartition dataPartition =
- partitionFetcher.getDataPartition(
- Collections.singletonMap(req.getDbName(),
Collections.singletonList(queryParam)));
- TRegionReplicaSet targetRegionReplicaSet =
- dataPartition.getAllReplicaSets().stream().findFirst().orElse(null);
- TEndPoint targetEndPoint =
- targetRegionReplicaSet != null
- ?
targetRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint()
- : null;
- TTableDeviceLeaderResp resp = new TTableDeviceLeaderResp();
- resp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
- if (targetEndPoint != null) {
- resp.setIp(targetEndPoint.getIp());
- resp.setPort(String.valueOf(targetEndPoint.getPort()));
- } else {
+ try {
+ DataPartition dataPartition =
+ partitionFetcher.getOrCreateDataPartition(
+ Collections.singletonMap(req.getDbName(),
Collections.singletonList(queryParam)));
+ TRegionReplicaSet targetRegionReplicaSet =
+ dataPartition.getAllReplicaSets().stream().findFirst().orElse(null);
+ TEndPoint targetEndPoint =
+ targetRegionReplicaSet != null
+ ?
targetRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint()
+ : null;
+ if (targetEndPoint != null) {
+ resp.setIp(targetEndPoint.getIp());
+ resp.setPort(String.valueOf(targetEndPoint.getPort()));
+ } else {
+ resp.setIp("");
+ resp.setPort("");
+ }
+ } catch (StatementAnalyzeException e) {
+ resp.setStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.EXECUTE_STATEMENT_ERROR,
+ "Failed to fetch device leader: " + e.getMessage()));
resp.setIp("");
resp.setPort("");
}