This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch query_v2_py
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/query_v2_py by this push:
     new 9ad5507d6ec finished
9ad5507d6ec is described below

commit 9ad5507d6ecdeb879e00191065c2ff4711d9874a
Author: HTHou <[email protected]>
AuthorDate: Fri Mar 21 17:08:12 2025 +0800

    finished
---
 iotdb-client/client-py/iotdb/Session.py            | 266 ++++++++-------------
 .../client-py/iotdb/utils/IoTDBRpcDataSet.py       |  16 +-
 .../{IoTDBConnectionException.py => exception.py}  |  20 ++
 iotdb-client/client-py/iotdb/utils/rpc_utils.py    |  69 ++++++
 4 files changed, 204 insertions(+), 167 deletions(-)

diff --git a/iotdb-client/client-py/iotdb/Session.py 
b/iotdb-client/client-py/iotdb/Session.py
index dd49958f3a8..b4ede1c3400 100644
--- a/iotdb-client/client-py/iotdb/Session.py
+++ b/iotdb-client/client-py/iotdb/Session.py
@@ -60,16 +60,14 @@ from .thrift.rpc.ttypes import (
     TSInsertStringRecordsOfOneDeviceReq,
 )
 from .tsfile.utils.date_utils import parse_date_to_int
-from .utils.IoTDBConnectionException import IoTDBConnectionException
+from .utils import rpc_utils
+from .utils.exception import IoTDBConnectionException, RedirectException
 
 logger = logging.getLogger("IoTDB")
 warnings.simplefilter("always", DeprecationWarning)
 
 
 class Session(object):
-    SUCCESS_STATUS = 200
-    MULTIPLE_ERROR = 302
-    REDIRECTION_RECOMMEND = 400
     DEFAULT_FETCH_SIZE = 5000
     DEFAULT_USER = "root"
     DEFAULT_PASSWORD = "root"
@@ -207,7 +205,7 @@ class Session(object):
 
         try:
             open_resp = client.openSession(open_req)
-            Session.verify_success(open_resp.status)
+            rpc_utils.verify_success(open_resp.status)
 
             if self.protocol_version != open_resp.serverProtocolVersion:
                 logger.exception(
@@ -288,13 +286,13 @@ class Session(object):
         :param group_name: String, database name (starts from root)
         """
         try:
-            return Session.verify_success(
+            return rpc_utils.verify_success(
                 self.__client.setStorageGroup(self.__session_id, group_name)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.setStorageGroup(self.__session_id, 
group_name)
                     )
                 except TTransport.TException as e1:
@@ -316,13 +314,13 @@ class Session(object):
         :param storage_group_lst: List, paths of the target databases.
         """
         try:
-            return Session.verify_success(
+            return rpc_utils.verify_success(
                 self.__client.deleteStorageGroups(self.__session_id, 
storage_group_lst)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.deleteStorageGroups(
                             self.__session_id, storage_group_lst
                         )
@@ -366,12 +364,12 @@ class Session(object):
             alias,
         )
         try:
-            return 
Session.verify_success(self.__client.createTimeseries(request))
+            return 
rpc_utils.verify_success(self.__client.createTimeseries(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.createTimeseries(request)
                     )
                 except TTransport.TException as e1:
@@ -400,14 +398,14 @@ class Session(object):
             compressor_lst,
         )
         try:
-            return Session.verify_success(
+            return rpc_utils.verify_success(
                 self.__client.createAlignedTimeseries(request)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.createAlignedTimeseries(request)
                     )
                 except TTransport.TException as e1:
@@ -450,12 +448,14 @@ class Session(object):
             alias_lst,
         )
         try:
-            return 
Session.verify_success(self.__client.createMultiTimeseries(request))
+            return rpc_utils.verify_success(
+                self.__client.createMultiTimeseries(request)
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.createMultiTimeseries(request)
                     )
                 except TTransport.TException as e1:
@@ -469,13 +469,13 @@ class Session(object):
         :param paths_list: List of time series path, which should be complete 
(starts from root)
         """
         try:
-            return Session.verify_success(
+            return rpc_utils.verify_success(
                 self.__client.deleteTimeseries(self.__session_id, paths_list)
             )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.deleteTimeseries(self.__session_id, 
paths_list)
                     )
                 except TTransport.TException as e1:
@@ -504,12 +504,12 @@ class Session(object):
             self.__session_id, paths_list, -9223372036854775808, end_time
         )
         try:
-            return Session.verify_success(self.__client.deleteData(request))
+            return rpc_utils.verify_success(self.__client.deleteData(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return 
Session.verify_success(self.__client.deleteData(request))
+                    return 
rpc_utils.verify_success(self.__client.deleteData(request))
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
             else:
@@ -524,12 +524,12 @@ class Session(object):
         """
         request = TSDeleteDataReq(self.__session_id, paths_list, start_time, 
end_time)
         try:
-            return Session.verify_success(self.__client.deleteData(request))
+            return rpc_utils.verify_success(self.__client.deleteData(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return 
Session.verify_success(self.__client.deleteData(request))
+                    return 
rpc_utils.verify_success(self.__client.deleteData(request))
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
             else:
@@ -556,7 +556,7 @@ class Session(object):
         try:
             connection = self.get_connection(device_id)
             request.sessionId = connection.session_id
-            return Session.verify_success_with_redirection(
+            return rpc_utils.verify_success_with_redirection(
                 connection.client.insertStringRecord(request)
             )
         except RedirectException as e:
@@ -565,7 +565,7 @@ class Session(object):
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.insertStringRecord(request)
                     )
                 except TTransport.TException as e1:
@@ -596,7 +596,7 @@ class Session(object):
         try:
             connection = self.get_connection(device_id)
             request.sessionId = connection.session_id
-            return Session.verify_success_with_redirection(
+            return rpc_utils.verify_success_with_redirection(
                 connection.client.insertStringRecord(request)
             )
         except RedirectException as e:
@@ -605,7 +605,7 @@ class Session(object):
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.insertStringRecord(request)
                     )
                 except TTransport.TException as e1:
@@ -645,7 +645,7 @@ class Session(object):
         try:
             connection = self.get_connection(device_id)
             request.sessionId = connection.session_id
-            return Session.verify_success_with_redirection(
+            return rpc_utils.verify_success_with_redirection(
                 connection.client.insertRecord(request)
             )
         except RedirectException as e:
@@ -654,7 +654,7 @@ class Session(object):
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return 
Session.verify_success(self.__client.insertRecord(request))
+                    return 
rpc_utils.verify_success(self.__client.insertRecord(request))
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
             else:
@@ -701,7 +701,7 @@ class Session(object):
                 )
             for client, request in request_group.items():
                 try:
-                    Session.verify_success_with_redirection_for_multi_devices(
+                    
rpc_utils.verify_success_with_redirection_for_multi_devices(
                         client.insertRecords(request), request.prefixPaths
                     )
                 except RedirectException as e:
@@ -711,7 +711,9 @@ class Session(object):
                     if self.reconnect():
                         try:
                             request.sessionId = self.__session_id
-                            
Session.verify_success(self.__client.insertRecords(request))
+                            rpc_utils.verify_success(
+                                self.__client.insertRecords(request)
+                            )
                         except TTransport.TException as e1:
                             raise IoTDBConnectionException(e1) from None
                     else:
@@ -725,12 +727,12 @@ class Session(object):
                 device_ids, times, measurements_lst, types_lst, values_lst
             )
             try:
-                return 
Session.verify_success(self.__client.insertRecords(request))
+                return 
rpc_utils.verify_success(self.__client.insertRecords(request))
             except TTransport.TException as e:
                 if self.reconnect():
                     try:
                         request.sessionId = self.__session_id
-                        return Session.verify_success(
+                        return rpc_utils.verify_success(
                             self.__client.insertRecords(request)
                         )
                     except TTransport.TException as e1:
@@ -774,7 +776,7 @@ class Session(object):
         try:
             connection = self.get_connection(device_id)
             request.sessionId = connection.session_id
-            return Session.verify_success_with_redirection(
+            return rpc_utils.verify_success_with_redirection(
                 connection.client.insertRecord(request)
             )
         except RedirectException as e:
@@ -783,7 +785,7 @@ class Session(object):
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return 
Session.verify_success(self.__client.insertRecord(request))
+                    return 
rpc_utils.verify_success(self.__client.insertRecord(request))
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
             else:
@@ -830,7 +832,7 @@ class Session(object):
                 )
             for client, request in request_group.items():
                 try:
-                    Session.verify_success_with_redirection_for_multi_devices(
+                    
rpc_utils.verify_success_with_redirection_for_multi_devices(
                         client.insertRecords(request), request.prefixPaths
                     )
                 except RedirectException as e:
@@ -840,7 +842,9 @@ class Session(object):
                     if self.reconnect():
                         try:
                             request.sessionId = self.__session_id
-                            
Session.verify_success(self.__client.insertRecords(request))
+                            rpc_utils.verify_success(
+                                self.__client.insertRecords(request)
+                            )
                         except TTransport.TException as e1:
                             raise IoTDBConnectionException(e1) from None
                     else:
@@ -854,12 +858,12 @@ class Session(object):
                 device_ids, times, measurements_lst, types_lst, values_lst, 
True
             )
             try:
-                return 
Session.verify_success(self.__client.insertRecords(request))
+                return 
rpc_utils.verify_success(self.__client.insertRecords(request))
             except TTransport.TException as e:
                 if self.reconnect():
                     try:
                         request.sessionId = self.__session_id
-                        return Session.verify_success(
+                        return rpc_utils.verify_success(
                             self.__client.insertRecords(request)
                         )
                     except TTransport.TException as e1:
@@ -885,11 +889,11 @@ class Session(object):
             device_id, timestamp, measurements, data_types, values
         )
         try:
-            return 
Session.verify_success(self.__client.testInsertRecord(request))
+            return 
rpc_utils.verify_success(self.__client.testInsertRecord(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.testInsertRecord(request)
                     )
                 except TTransport.TException as e1:
@@ -913,11 +917,11 @@ class Session(object):
             device_ids, times, measurements_lst, types_lst, values_lst
         )
         try:
-            return 
Session.verify_success(self.__client.testInsertRecords(request))
+            return 
rpc_utils.verify_success(self.__client.testInsertRecords(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.testInsertRecords(request)
                     )
                 except TTransport.TException as e1:
@@ -1008,7 +1012,7 @@ class Session(object):
         try:
             connection = self.get_connection(tablet.get_insert_target_name())
             request.sessionId = connection.session_id
-            return Session.verify_success_with_redirection(
+            return rpc_utils.verify_success_with_redirection(
                 connection.client.insertTablet(request)
             )
         except RedirectException as e:
@@ -1019,7 +1023,7 @@ class Session(object):
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return 
Session.verify_success(self.__client.insertTablet(request))
+                    return 
rpc_utils.verify_success(self.__client.insertTablet(request))
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
             else:
@@ -1048,7 +1052,7 @@ class Session(object):
                 request.typesList.append(tablet_lst[i].get_data_types())
             for client, request in request_group.items():
                 try:
-                    Session.verify_success_with_redirection_for_multi_devices(
+                    
rpc_utils.verify_success_with_redirection_for_multi_devices(
                         client.insertTablets(request), request.prefixPaths
                     )
                 except RedirectException as e:
@@ -1058,7 +1062,9 @@ class Session(object):
                     if self.reconnect():
                         try:
                             request.sessionId = self.__session_id
-                            
Session.verify_success(self.__client.insertTablets(request))
+                            rpc_utils.verify_success(
+                                self.__client.insertTablets(request)
+                            )
                         except TTransport.TException as e1:
                             raise IoTDBConnectionException(e1) from None
                     else:
@@ -1070,12 +1076,12 @@ class Session(object):
         else:
             request = self.gen_insert_tablets_req(tablet_lst)
             try:
-                return 
Session.verify_success(self.__client.insertTablets(request))
+                return 
rpc_utils.verify_success(self.__client.insertTablets(request))
             except TTransport.TException as e:
                 if self.reconnect():
                     try:
                         request.sessionId = self.__session_id
-                        return Session.verify_success(
+                        return rpc_utils.verify_success(
                             self.__client.insertTablets(request)
                         )
                     except TTransport.TException as e1:
@@ -1101,7 +1107,7 @@ class Session(object):
         try:
             connection = self.get_connection(tablet.get_insert_target_name())
             request.sessionId = connection.session_id
-            return Session.verify_success_with_redirection(
+            return rpc_utils.verify_success_with_redirection(
                 connection.client.insertTablet(request)
             )
         except RedirectException as e:
@@ -1112,7 +1118,7 @@ class Session(object):
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return 
Session.verify_success(self.__client.insertTablet(request))
+                    return 
rpc_utils.verify_success(self.__client.insertTablet(request))
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
             else:
@@ -1141,7 +1147,7 @@ class Session(object):
                 request.typesList.append(tablet_lst[i].get_data_types())
             for client, request in request_group.items():
                 try:
-                    Session.verify_success_with_redirection_for_multi_devices(
+                    
rpc_utils.verify_success_with_redirection_for_multi_devices(
                         client.insertTablets(request), request.prefixPaths
                     )
                 except RedirectException as e:
@@ -1151,7 +1157,9 @@ class Session(object):
                     if self.reconnect():
                         try:
                             request.sessionId = self.__session_id
-                            
Session.verify_success(self.__client.insertTablets(request))
+                            rpc_utils.verify_success(
+                                self.__client.insertTablets(request)
+                            )
                         except TTransport.TException as e1:
                             raise IoTDBConnectionException(e1) from None
                     else:
@@ -1163,12 +1171,12 @@ class Session(object):
         else:
             request = self.gen_insert_tablets_req(tablet_lst, True)
             try:
-                return 
Session.verify_success(self.__client.insertTablets(request))
+                return 
rpc_utils.verify_success(self.__client.insertTablets(request))
             except TTransport.TException as e:
                 if self.reconnect():
                     try:
                         request.sessionId = self.__session_id
-                        return Session.verify_success(
+                        return rpc_utils.verify_success(
                             self.__client.insertTablets(request)
                         )
                     except TTransport.TException as e1:
@@ -1191,7 +1199,7 @@ class Session(object):
         try:
             connection = self.get_connection(tablet.get_insert_target_name())
             request.sessionId = connection.session_id
-            return Session.verify_success_with_redirection(
+            return rpc_utils.verify_success_with_redirection(
                 connection.client.insertTablet(request)
             )
         except RedirectException as e:
@@ -1202,7 +1210,7 @@ class Session(object):
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return 
Session.verify_success(self.__client.insertTablet(request))
+                    return 
rpc_utils.verify_success(self.__client.insertTablet(request))
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
             else:
@@ -1261,7 +1269,7 @@ class Session(object):
         try:
             connection = self.get_connection(device_id)
             request.sessionId = connection.session_id
-            return Session.verify_success_with_redirection(
+            return rpc_utils.verify_success_with_redirection(
                 connection.client.insertRecordsOfOneDevice(request)
             )
         except RedirectException as e:
@@ -1270,7 +1278,7 @@ class Session(object):
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.insertRecordsOfOneDevice(request)
                     )
                 except TTransport.TException as e1:
@@ -1333,7 +1341,7 @@ class Session(object):
         try:
             connection = self.get_connection(device_id)
             request.sessionId = connection.session_id
-            return Session.verify_success_with_redirection(
+            return rpc_utils.verify_success_with_redirection(
                 connection.client.insertRecordsOfOneDevice(request)
             )
         except RedirectException as e:
@@ -1342,7 +1350,7 @@ class Session(object):
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.insertRecordsOfOneDevice(request)
                     )
                 except TTransport.TException as e1:
@@ -1387,12 +1395,12 @@ class Session(object):
         """
         request = self.gen_insert_tablet_req(tablet)
         try:
-            return 
Session.verify_success(self.__client.testInsertTablet(request))
+            return 
rpc_utils.verify_success(self.__client.testInsertTablet(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.testInsertTablet(request)
                     )
                 except TTransport.TException as e1:
@@ -1408,12 +1416,12 @@ class Session(object):
         """
         request = self.gen_insert_tablets_req(tablet_list)
         try:
-            return 
Session.verify_success(self.__client.testInsertTablets(request))
+            return 
rpc_utils.verify_success(self.__client.testInsertTablets(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.testInsertTablets(request)
                     )
                 except TTransport.TException as e1:
@@ -1495,7 +1503,7 @@ class Session(object):
             else:
                 raise IoTDBConnectionException(self.connection_error_msg()) 
from None
 
-        Session.verify_success(resp.status)
+        rpc_utils.verify_success(resp.status)
         return SessionDataSet(
             sql,
             resp.columns,
@@ -1544,7 +1552,7 @@ class Session(object):
                         connection.change_database(sql)
                     except Exception as e:
                         self.__endpoint_to_connection.pop(endpoint)
-        return Session.verify_success(resp.status)
+        return rpc_utils.verify_success(resp.status)
 
     def execute_statement(self, sql: str, timeout=0):
         request = TSExecuteStatementReq(
@@ -1563,7 +1571,7 @@ class Session(object):
             else:
                 raise IoTDBConnectionException(self.connection_error_msg()) 
from None
 
-        Session.verify_success(resp.status)
+        rpc_utils.verify_success(resp.status)
         if resp.columns:
             return SessionDataSet(
                 sql,
@@ -1694,61 +1702,6 @@ class Session(object):
                 return False
         return True
 
-    @staticmethod
-    def verify_success(status: TSStatus):
-        """
-        verify success of operation
-        :param status: execution result status
-        """
-        if status.code == Session.MULTIPLE_ERROR:
-            Session.verify_success_by_list(status.subStatus)
-            return 0
-        if (
-            status.code == Session.SUCCESS_STATUS
-            or status.code == Session.REDIRECTION_RECOMMEND
-        ):
-            return 0
-
-        raise RuntimeError(f"{status.code}: {status.message}")
-
-    @staticmethod
-    def verify_success_by_list(status_list: list):
-        """
-        verify success of operation
-        :param status_list: execution result status
-        """
-        error_messages = [
-            status.message
-            for status in status_list
-            if status.code
-            not in {Session.SUCCESS_STATUS, Session.REDIRECTION_RECOMMEND}
-        ]
-        if error_messages:
-            message = f"{Session.MULTIPLE_ERROR}: {'; '.join(error_messages)}"
-            raise RuntimeError(message)
-
-    @staticmethod
-    def verify_success_with_redirection(status: TSStatus):
-        Session.verify_success(status)
-        if status.redirectNode is not None:
-            raise RedirectException(status.redirectNode)
-        return 0
-
-    @staticmethod
-    def verify_success_with_redirection_for_multi_devices(
-        status: TSStatus, devices: list
-    ):
-        Session.verify_success(status)
-        if (
-            status.code == Session.MULTIPLE_ERROR
-            or status.code == Session.REDIRECTION_RECOMMEND
-        ):
-            device_to_endpoint = {}
-            for i in range(len(status.subStatus)):
-                if status.subStatus[i].redirectNode is not None:
-                    device_to_endpoint[devices[i]] = 
status.subStatus[i].redirectNode
-            raise RedirectException(device_to_endpoint)
-
     def execute_raw_data_query(
         self, paths: list, start_time: int, end_time: int
     ) -> SessionDataSet:
@@ -1780,7 +1733,7 @@ class Session(object):
                     raise IoTDBConnectionException(e1) from None
             else:
                 raise IoTDBConnectionException(self.connection_error_msg()) 
from None
-        Session.verify_success(resp.status)
+        rpc_utils.verify_success(resp.status)
         return SessionDataSet(
             "",
             resp.columns,
@@ -1825,7 +1778,7 @@ class Session(object):
                     raise IoTDBConnectionException(e1) from None
             else:
                 raise IoTDBConnectionException(self.connection_error_msg()) 
from None
-        Session.verify_success(resp.status)
+        rpc_utils.verify_success(resp.status)
         return SessionDataSet(
             "",
             resp.columns,
@@ -1871,7 +1824,7 @@ class Session(object):
         try:
             connection = self.get_connection(device_id)
             request.sessionId = connection.session_id
-            return Session.verify_success_with_redirection(
+            return rpc_utils.verify_success_with_redirection(
                 connection.client.insertStringRecordsOfOneDevice(request)
             )
         except RedirectException as e:
@@ -1880,7 +1833,7 @@ class Session(object):
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.insertStringRecordsOfOneDevice(request)
                     )
                 except TTransport.TException as e1:
@@ -1906,7 +1859,7 @@ class Session(object):
         try:
             connection = self.get_connection(device_id)
             request.sessionId = connection.session_id
-            return Session.verify_success_with_redirection(
+            return rpc_utils.verify_success_with_redirection(
                 connection.client.insertStringRecordsOfOneDevice(request)
             )
         except RedirectException as e:
@@ -1915,7 +1868,7 @@ class Session(object):
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.insertStringRecordsOfOneDevice(request)
                     )
                 except TTransport.TException as e1:
@@ -2092,12 +2045,12 @@ class Session(object):
             self.__session_id, template.get_name(), bytes_array
         )
         try:
-            return 
Session.verify_success(self.__client.createSchemaTemplate(request))
+            return 
rpc_utils.verify_success(self.__client.createSchemaTemplate(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.createSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
@@ -2117,12 +2070,12 @@ class Session(object):
         """
         request = TSDropSchemaTemplateReq(self.__session_id, template_name)
         try:
-            return 
Session.verify_success(self.__client.dropSchemaTemplate(request))
+            return 
rpc_utils.verify_success(self.__client.dropSchemaTemplate(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.dropSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
@@ -2164,12 +2117,12 @@ class Session(object):
             compressors,
         )
         try:
-            return 
Session.verify_success(self.__client.appendSchemaTemplate(request))
+            return 
rpc_utils.verify_success(self.__client.appendSchemaTemplate(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.appendSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
@@ -2190,12 +2143,12 @@ class Session(object):
         """
         request = TSPruneSchemaTemplateReq(self.__session_id, template_name, 
path)
         try:
-            return 
Session.verify_success(self.__client.pruneSchemaTemplate(request))
+            return 
rpc_utils.verify_success(self.__client.pruneSchemaTemplate(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.pruneSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
@@ -2216,12 +2169,12 @@ class Session(object):
         """
         request = TSSetSchemaTemplateReq(self.__session_id, template_name, 
prefix_path)
         try:
-            return 
Session.verify_success(self.__client.setSchemaTemplate(request))
+            return 
rpc_utils.verify_success(self.__client.setSchemaTemplate(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.setSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
@@ -2245,12 +2198,12 @@ class Session(object):
             self.__session_id, prefix_path, template_name
         )
         try:
-            return 
Session.verify_success(self.__client.unsetSchemaTemplate(request))
+            return 
rpc_utils.verify_success(self.__client.unsetSchemaTemplate(request))
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
-                    return Session.verify_success(
+                    return rpc_utils.verify_success(
                         self.__client.unsetSchemaTemplate(request)
                     )
                 except TTransport.TException as e1:
@@ -2281,7 +2234,7 @@ class Session(object):
                 try:
                     request.sessionId = self.__session_id
                     response = self.__client.querySchemaTemplate(request)
-                    Session.verify_success(response.status)
+                    rpc_utils.verify_success(response.status)
                     return response.count
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
@@ -2307,14 +2260,14 @@ class Session(object):
         )
         try:
             response = self.__client.querySchemaTemplate(request)
-            Session.verify_success(response.status)
+            rpc_utils.verify_success(response.status)
             return response.result
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     response = self.__client.querySchemaTemplate(request)
-                    Session.verify_success(response.status)
+                    rpc_utils.verify_success(response.status)
                     return response.result
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
@@ -2337,14 +2290,14 @@ class Session(object):
         )
         try:
             response = self.__client.querySchemaTemplate(request)
-            Session.verify_success(response.status)
+            rpc_utils.verify_success(response.status)
             return response.result
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     response = self.__client.querySchemaTemplate(request)
-                    Session.verify_success(response.status)
+                    rpc_utils.verify_success(response.status)
                     return response.result
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
@@ -2370,14 +2323,14 @@ class Session(object):
         )
         try:
             response = self.__client.querySchemaTemplate(request)
-            Session.verify_success(response.status)
+            rpc_utils.verify_success(response.status)
             return response.measurements
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     response = self.__client.querySchemaTemplate(request)
-                    Session.verify_success(response.status)
+                    rpc_utils.verify_success(response.status)
                     return response.measurements
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
@@ -2400,14 +2353,14 @@ class Session(object):
         )
         try:
             response = self.__client.querySchemaTemplate(request)
-            Session.verify_success(response.status)
+            rpc_utils.verify_success(response.status)
             return response.measurements
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     response = self.__client.querySchemaTemplate(request)
-                    Session.verify_success(response.status)
+                    rpc_utils.verify_success(response.status)
                     return response.measurements
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
@@ -2429,14 +2382,14 @@ class Session(object):
         )
         try:
             response = self.__client.querySchemaTemplate(request)
-            Session.verify_success(response.status)
+            rpc_utils.verify_success(response.status)
             return response.measurements
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     response = self.__client.querySchemaTemplate(request)
-                    Session.verify_success(response.status)
+                    rpc_utils.verify_success(response.status)
                     return response.measurements
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
@@ -2460,14 +2413,14 @@ class Session(object):
         )
         try:
             response = self.__client.querySchemaTemplate(request)
-            Session.verify_success(response.status)
+            rpc_utils.verify_success(response.status)
             return response.measurements
         except TTransport.TException as e:
             if self.reconnect():
                 try:
                     request.sessionId = self.__session_id
                     response = self.__client.querySchemaTemplate(request)
-                    Session.verify_success(response.status)
+                    rpc_utils.verify_success(response.status)
                     return response.measurements
                 except TTransport.TException as e1:
                     raise IoTDBConnectionException(e1) from None
@@ -2510,12 +2463,3 @@ class SessionConnection(object):
         finally:
             if self.transport is not None:
                 self.transport.close()
-
-
-class RedirectException(Exception):
-    def __init__(self, redirect_info):
-        Exception.__init__(self)
-        if isinstance(redirect_info, TEndPoint):
-            self.redirect_node = redirect_info
-        else:
-            self.device_to_endpoint = redirect_info
diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py 
b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
index fd0dec79fa7..0c4230381fa 100644
--- a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
+++ b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
@@ -22,11 +22,13 @@ import logging
 import numpy as np
 import pandas as pd
 from thrift.transport import TTransport
+
 from iotdb.thrift.rpc.IClientRPCService import TSFetchResultsReq, 
TSCloseOperationReq
 from iotdb.tsfile.utils.date_utils import parse_int_to_date
 from iotdb.tsfile.utils.tsblock_serde import deserialize
-from iotdb.utils.IoTDBConnectionException import IoTDBConnectionException
+from iotdb.utils.exception import IoTDBConnectionException
 from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.rpc_utils import verify_success
 
 logger = logging.getLogger("IoTDB")
 TIMESTAMP_STR = "Time"
@@ -60,6 +62,7 @@ class IoTDBRpcDataSet(object):
         self.__fetch_size = fetch_size
         self.column_size = len(column_name_list)
         self.__time_out = time_out
+        self.__more_data = more_data
 
         self.__column_name_list = []
         self.__column_type_list = []
@@ -112,7 +115,6 @@ class IoTDBRpcDataSet(object):
         self.__query_result_index = 0
         self.__is_closed = False
         self.__empty_resultSet = False
-        self.__rows_index = 0
         self.has_cached_data_frame = False
         self.data_frame = None
 
@@ -146,7 +148,7 @@ class IoTDBRpcDataSet(object):
             return True
         if self.__empty_resultSet:
             return False
-        if self.fetch_results():
+        if self.__more_data and self.fetch_results():
             self.construct_one_data_frame()
             return True
         return False
@@ -254,7 +256,7 @@ class IoTDBRpcDataSet(object):
             return True
         if self.__empty_resultSet:
             return False
-        if self.fetch_results():
+        if self.__more_data and self.fetch_results():
             return True
         return False
 
@@ -405,7 +407,6 @@ class IoTDBRpcDataSet(object):
     def fetch_results(self):
         if self.__is_closed:
             raise IoTDBConnectionException("This DataSet is already closed")
-        self.__rows_index = 0
         request = TSFetchResultsReq(
             self.__session_id,
             self.__sql,
@@ -413,9 +414,12 @@ class IoTDBRpcDataSet(object):
             self.__query_id,
             True,
             self.__time_out,
+            self.__statement_id,
         )
         try:
             resp = self.__client.fetchResultsV2(request)
+            verify_success(resp.status)
+            self.__more_data = resp.moreData
             if not resp.hasResultSet:
                 self.__empty_resultSet = True
             else:
@@ -423,7 +427,7 @@ class IoTDBRpcDataSet(object):
                 self.__query_result_index = 0
             return resp.hasResultSet
         except TTransport.TException as e:
-            raise RuntimeError(
+            raise IoTDBConnectionException(
                 "Cannot fetch result from server, because of network 
connection: ", e
             )
 
diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBConnectionException.py 
b/iotdb-client/client-py/iotdb/utils/exception.py
similarity index 64%
rename from iotdb-client/client-py/iotdb/utils/IoTDBConnectionException.py
rename to iotdb-client/client-py/iotdb/utils/exception.py
index 0f7b895d1f8..2e233c6e7b0 100644
--- a/iotdb-client/client-py/iotdb/utils/IoTDBConnectionException.py
+++ b/iotdb-client/client-py/iotdb/utils/exception.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+from iotdb.thrift.common.ttypes import TEndPoint, TSStatus
 
 
 class IoTDBConnectionException(Exception):
@@ -27,3 +28,22 @@ class IoTDBConnectionException(Exception):
             super().__init__(message, cause)
         else:
             super().__init__()
+
+
+class StatementExecutionException(Exception):
+    def __init__(self, status: TSStatus = None, message=None):
+        if status is not None:
+            super().__init__(f"{status.code}: {status.message}")
+        elif message is not None:
+            super().__init__(message)
+        else:
+            super().__init__()
+
+
+class RedirectException(Exception):
+    def __init__(self, redirect_info):
+        Exception.__init__(self)
+        if isinstance(redirect_info, TEndPoint):
+            self.redirect_node = redirect_info
+        else:
+            self.device_to_endpoint = redirect_info
diff --git a/iotdb-client/client-py/iotdb/utils/rpc_utils.py 
b/iotdb-client/client-py/iotdb/utils/rpc_utils.py
new file mode 100644
index 00000000000..6ceb39c6558
--- /dev/null
+++ b/iotdb-client/client-py/iotdb/utils/rpc_utils.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from iotdb.thrift.common.ttypes import TSStatus
+from iotdb.utils.exception import RedirectException, 
StatementExecutionException
+
+SUCCESS_STATUS = 200
+MULTIPLE_ERROR = 302
+REDIRECTION_RECOMMEND = 400
+
+
+def verify_success(status: TSStatus):
+    """
+    verify success of operation
+    :param status: execution result status
+    """
+    if status.code == MULTIPLE_ERROR:
+        verify_success_by_list(status.subStatus)
+        return 0
+    if status.code == SUCCESS_STATUS or status.code == REDIRECTION_RECOMMEND:
+        return 0
+
+    raise StatementExecutionException(status)
+
+
+def verify_success_by_list(status_list: list):
+    """
+    verify success of operation
+    :param status_list: execution result status
+    """
+    error_messages = [
+        status.message
+        for status in status_list
+        if status.code not in {SUCCESS_STATUS, REDIRECTION_RECOMMEND}
+    ]
+    if error_messages:
+        message = f"{MULTIPLE_ERROR}: {'; '.join(error_messages)}"
+        raise StatementExecutionException(message=message)
+
+
+def verify_success_with_redirection(status: TSStatus):
+    verify_success(status)
+    if status.redirectNode is not None:
+        raise RedirectException(status.redirectNode)
+    return 0
+
+
+def verify_success_with_redirection_for_multi_devices(status: TSStatus, 
devices: list):
+    verify_success(status)
+    if status.code == MULTIPLE_ERROR or status.code == REDIRECTION_RECOMMEND:
+        device_to_endpoint = {}
+        for i in range(len(status.subStatus)):
+            if status.subStatus[i].redirectNode is not None:
+                device_to_endpoint[devices[i]] = 
status.subStatus[i].redirectNode
+        raise RedirectException(device_to_endpoint)


Reply via email to