This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch query_v2_py
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/query_v2_py by this push:
new 9ad5507d6ec finished
9ad5507d6ec is described below
commit 9ad5507d6ecdeb879e00191065c2ff4711d9874a
Author: HTHou <[email protected]>
AuthorDate: Fri Mar 21 17:08:12 2025 +0800
finished
---
iotdb-client/client-py/iotdb/Session.py | 266 ++++++++-------------
.../client-py/iotdb/utils/IoTDBRpcDataSet.py | 16 +-
.../{IoTDBConnectionException.py => exception.py} | 20 ++
iotdb-client/client-py/iotdb/utils/rpc_utils.py | 69 ++++++
4 files changed, 204 insertions(+), 167 deletions(-)
diff --git a/iotdb-client/client-py/iotdb/Session.py
b/iotdb-client/client-py/iotdb/Session.py
index dd49958f3a8..b4ede1c3400 100644
--- a/iotdb-client/client-py/iotdb/Session.py
+++ b/iotdb-client/client-py/iotdb/Session.py
@@ -60,16 +60,14 @@ from .thrift.rpc.ttypes import (
TSInsertStringRecordsOfOneDeviceReq,
)
from .tsfile.utils.date_utils import parse_date_to_int
-from .utils.IoTDBConnectionException import IoTDBConnectionException
+from .utils import rpc_utils
+from .utils.exception import IoTDBConnectionException, RedirectException
logger = logging.getLogger("IoTDB")
warnings.simplefilter("always", DeprecationWarning)
class Session(object):
- SUCCESS_STATUS = 200
- MULTIPLE_ERROR = 302
- REDIRECTION_RECOMMEND = 400
DEFAULT_FETCH_SIZE = 5000
DEFAULT_USER = "root"
DEFAULT_PASSWORD = "root"
@@ -207,7 +205,7 @@ class Session(object):
try:
open_resp = client.openSession(open_req)
- Session.verify_success(open_resp.status)
+ rpc_utils.verify_success(open_resp.status)
if self.protocol_version != open_resp.serverProtocolVersion:
logger.exception(
@@ -288,13 +286,13 @@ class Session(object):
:param group_name: String, database name (starts from root)
"""
try:
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.setStorageGroup(self.__session_id, group_name)
)
except TTransport.TException as e:
if self.reconnect():
try:
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.setStorageGroup(self.__session_id,
group_name)
)
except TTransport.TException as e1:
@@ -316,13 +314,13 @@ class Session(object):
:param storage_group_lst: List, paths of the target databases.
"""
try:
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.deleteStorageGroups(self.__session_id,
storage_group_lst)
)
except TTransport.TException as e:
if self.reconnect():
try:
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.deleteStorageGroups(
self.__session_id, storage_group_lst
)
@@ -366,12 +364,12 @@ class Session(object):
alias,
)
try:
- return
Session.verify_success(self.__client.createTimeseries(request))
+ return
rpc_utils.verify_success(self.__client.createTimeseries(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.createTimeseries(request)
)
except TTransport.TException as e1:
@@ -400,14 +398,14 @@ class Session(object):
compressor_lst,
)
try:
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.createAlignedTimeseries(request)
)
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.createAlignedTimeseries(request)
)
except TTransport.TException as e1:
@@ -450,12 +448,14 @@ class Session(object):
alias_lst,
)
try:
- return
Session.verify_success(self.__client.createMultiTimeseries(request))
+ return rpc_utils.verify_success(
+ self.__client.createMultiTimeseries(request)
+ )
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.createMultiTimeseries(request)
)
except TTransport.TException as e1:
@@ -469,13 +469,13 @@ class Session(object):
:param paths_list: List of time series path, which should be complete
(starts from root)
"""
try:
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.deleteTimeseries(self.__session_id, paths_list)
)
except TTransport.TException as e:
if self.reconnect():
try:
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.deleteTimeseries(self.__session_id,
paths_list)
)
except TTransport.TException as e1:
@@ -504,12 +504,12 @@ class Session(object):
self.__session_id, paths_list, -9223372036854775808, end_time
)
try:
- return Session.verify_success(self.__client.deleteData(request))
+ return rpc_utils.verify_success(self.__client.deleteData(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return
Session.verify_success(self.__client.deleteData(request))
+ return
rpc_utils.verify_success(self.__client.deleteData(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
@@ -524,12 +524,12 @@ class Session(object):
"""
request = TSDeleteDataReq(self.__session_id, paths_list, start_time,
end_time)
try:
- return Session.verify_success(self.__client.deleteData(request))
+ return rpc_utils.verify_success(self.__client.deleteData(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return
Session.verify_success(self.__client.deleteData(request))
+ return
rpc_utils.verify_success(self.__client.deleteData(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
@@ -556,7 +556,7 @@ class Session(object):
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
- return Session.verify_success_with_redirection(
+ return rpc_utils.verify_success_with_redirection(
connection.client.insertStringRecord(request)
)
except RedirectException as e:
@@ -565,7 +565,7 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.insertStringRecord(request)
)
except TTransport.TException as e1:
@@ -596,7 +596,7 @@ class Session(object):
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
- return Session.verify_success_with_redirection(
+ return rpc_utils.verify_success_with_redirection(
connection.client.insertStringRecord(request)
)
except RedirectException as e:
@@ -605,7 +605,7 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.insertStringRecord(request)
)
except TTransport.TException as e1:
@@ -645,7 +645,7 @@ class Session(object):
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
- return Session.verify_success_with_redirection(
+ return rpc_utils.verify_success_with_redirection(
connection.client.insertRecord(request)
)
except RedirectException as e:
@@ -654,7 +654,7 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
- return
Session.verify_success(self.__client.insertRecord(request))
+ return
rpc_utils.verify_success(self.__client.insertRecord(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
@@ -701,7 +701,7 @@ class Session(object):
)
for client, request in request_group.items():
try:
- Session.verify_success_with_redirection_for_multi_devices(
+
rpc_utils.verify_success_with_redirection_for_multi_devices(
client.insertRecords(request), request.prefixPaths
)
except RedirectException as e:
@@ -711,7 +711,9 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
-
Session.verify_success(self.__client.insertRecords(request))
+ rpc_utils.verify_success(
+ self.__client.insertRecords(request)
+ )
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
@@ -725,12 +727,12 @@ class Session(object):
device_ids, times, measurements_lst, types_lst, values_lst
)
try:
- return
Session.verify_success(self.__client.insertRecords(request))
+ return
rpc_utils.verify_success(self.__client.insertRecords(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.insertRecords(request)
)
except TTransport.TException as e1:
@@ -774,7 +776,7 @@ class Session(object):
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
- return Session.verify_success_with_redirection(
+ return rpc_utils.verify_success_with_redirection(
connection.client.insertRecord(request)
)
except RedirectException as e:
@@ -783,7 +785,7 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
- return
Session.verify_success(self.__client.insertRecord(request))
+ return
rpc_utils.verify_success(self.__client.insertRecord(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
@@ -830,7 +832,7 @@ class Session(object):
)
for client, request in request_group.items():
try:
- Session.verify_success_with_redirection_for_multi_devices(
+
rpc_utils.verify_success_with_redirection_for_multi_devices(
client.insertRecords(request), request.prefixPaths
)
except RedirectException as e:
@@ -840,7 +842,9 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
-
Session.verify_success(self.__client.insertRecords(request))
+ rpc_utils.verify_success(
+ self.__client.insertRecords(request)
+ )
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
@@ -854,12 +858,12 @@ class Session(object):
device_ids, times, measurements_lst, types_lst, values_lst,
True
)
try:
- return
Session.verify_success(self.__client.insertRecords(request))
+ return
rpc_utils.verify_success(self.__client.insertRecords(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.insertRecords(request)
)
except TTransport.TException as e1:
@@ -885,11 +889,11 @@ class Session(object):
device_id, timestamp, measurements, data_types, values
)
try:
- return
Session.verify_success(self.__client.testInsertRecord(request))
+ return
rpc_utils.verify_success(self.__client.testInsertRecord(request))
except TTransport.TException as e:
if self.reconnect():
try:
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.testInsertRecord(request)
)
except TTransport.TException as e1:
@@ -913,11 +917,11 @@ class Session(object):
device_ids, times, measurements_lst, types_lst, values_lst
)
try:
- return
Session.verify_success(self.__client.testInsertRecords(request))
+ return
rpc_utils.verify_success(self.__client.testInsertRecords(request))
except TTransport.TException as e:
if self.reconnect():
try:
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.testInsertRecords(request)
)
except TTransport.TException as e1:
@@ -1008,7 +1012,7 @@ class Session(object):
try:
connection = self.get_connection(tablet.get_insert_target_name())
request.sessionId = connection.session_id
- return Session.verify_success_with_redirection(
+ return rpc_utils.verify_success_with_redirection(
connection.client.insertTablet(request)
)
except RedirectException as e:
@@ -1019,7 +1023,7 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
- return
Session.verify_success(self.__client.insertTablet(request))
+ return
rpc_utils.verify_success(self.__client.insertTablet(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
@@ -1048,7 +1052,7 @@ class Session(object):
request.typesList.append(tablet_lst[i].get_data_types())
for client, request in request_group.items():
try:
- Session.verify_success_with_redirection_for_multi_devices(
+
rpc_utils.verify_success_with_redirection_for_multi_devices(
client.insertTablets(request), request.prefixPaths
)
except RedirectException as e:
@@ -1058,7 +1062,9 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
-
Session.verify_success(self.__client.insertTablets(request))
+ rpc_utils.verify_success(
+ self.__client.insertTablets(request)
+ )
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
@@ -1070,12 +1076,12 @@ class Session(object):
else:
request = self.gen_insert_tablets_req(tablet_lst)
try:
- return
Session.verify_success(self.__client.insertTablets(request))
+ return
rpc_utils.verify_success(self.__client.insertTablets(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.insertTablets(request)
)
except TTransport.TException as e1:
@@ -1101,7 +1107,7 @@ class Session(object):
try:
connection = self.get_connection(tablet.get_insert_target_name())
request.sessionId = connection.session_id
- return Session.verify_success_with_redirection(
+ return rpc_utils.verify_success_with_redirection(
connection.client.insertTablet(request)
)
except RedirectException as e:
@@ -1112,7 +1118,7 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
- return
Session.verify_success(self.__client.insertTablet(request))
+ return
rpc_utils.verify_success(self.__client.insertTablet(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
@@ -1141,7 +1147,7 @@ class Session(object):
request.typesList.append(tablet_lst[i].get_data_types())
for client, request in request_group.items():
try:
- Session.verify_success_with_redirection_for_multi_devices(
+
rpc_utils.verify_success_with_redirection_for_multi_devices(
client.insertTablets(request), request.prefixPaths
)
except RedirectException as e:
@@ -1151,7 +1157,9 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
-
Session.verify_success(self.__client.insertTablets(request))
+ rpc_utils.verify_success(
+ self.__client.insertTablets(request)
+ )
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
@@ -1163,12 +1171,12 @@ class Session(object):
else:
request = self.gen_insert_tablets_req(tablet_lst, True)
try:
- return
Session.verify_success(self.__client.insertTablets(request))
+ return
rpc_utils.verify_success(self.__client.insertTablets(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.insertTablets(request)
)
except TTransport.TException as e1:
@@ -1191,7 +1199,7 @@ class Session(object):
try:
connection = self.get_connection(tablet.get_insert_target_name())
request.sessionId = connection.session_id
- return Session.verify_success_with_redirection(
+ return rpc_utils.verify_success_with_redirection(
connection.client.insertTablet(request)
)
except RedirectException as e:
@@ -1202,7 +1210,7 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
- return
Session.verify_success(self.__client.insertTablet(request))
+ return
rpc_utils.verify_success(self.__client.insertTablet(request))
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
else:
@@ -1261,7 +1269,7 @@ class Session(object):
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
- return Session.verify_success_with_redirection(
+ return rpc_utils.verify_success_with_redirection(
connection.client.insertRecordsOfOneDevice(request)
)
except RedirectException as e:
@@ -1270,7 +1278,7 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.insertRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
@@ -1333,7 +1341,7 @@ class Session(object):
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
- return Session.verify_success_with_redirection(
+ return rpc_utils.verify_success_with_redirection(
connection.client.insertRecordsOfOneDevice(request)
)
except RedirectException as e:
@@ -1342,7 +1350,7 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.insertRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
@@ -1387,12 +1395,12 @@ class Session(object):
"""
request = self.gen_insert_tablet_req(tablet)
try:
- return
Session.verify_success(self.__client.testInsertTablet(request))
+ return
rpc_utils.verify_success(self.__client.testInsertTablet(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.testInsertTablet(request)
)
except TTransport.TException as e1:
@@ -1408,12 +1416,12 @@ class Session(object):
"""
request = self.gen_insert_tablets_req(tablet_list)
try:
- return
Session.verify_success(self.__client.testInsertTablets(request))
+ return
rpc_utils.verify_success(self.__client.testInsertTablets(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.testInsertTablets(request)
)
except TTransport.TException as e1:
@@ -1495,7 +1503,7 @@ class Session(object):
else:
raise IoTDBConnectionException(self.connection_error_msg())
from None
- Session.verify_success(resp.status)
+ rpc_utils.verify_success(resp.status)
return SessionDataSet(
sql,
resp.columns,
@@ -1544,7 +1552,7 @@ class Session(object):
connection.change_database(sql)
except Exception as e:
self.__endpoint_to_connection.pop(endpoint)
- return Session.verify_success(resp.status)
+ return rpc_utils.verify_success(resp.status)
def execute_statement(self, sql: str, timeout=0):
request = TSExecuteStatementReq(
@@ -1563,7 +1571,7 @@ class Session(object):
else:
raise IoTDBConnectionException(self.connection_error_msg())
from None
- Session.verify_success(resp.status)
+ rpc_utils.verify_success(resp.status)
if resp.columns:
return SessionDataSet(
sql,
@@ -1694,61 +1702,6 @@ class Session(object):
return False
return True
- @staticmethod
- def verify_success(status: TSStatus):
- """
- verify success of operation
- :param status: execution result status
- """
- if status.code == Session.MULTIPLE_ERROR:
- Session.verify_success_by_list(status.subStatus)
- return 0
- if (
- status.code == Session.SUCCESS_STATUS
- or status.code == Session.REDIRECTION_RECOMMEND
- ):
- return 0
-
- raise RuntimeError(f"{status.code}: {status.message}")
-
- @staticmethod
- def verify_success_by_list(status_list: list):
- """
- verify success of operation
- :param status_list: execution result status
- """
- error_messages = [
- status.message
- for status in status_list
- if status.code
- not in {Session.SUCCESS_STATUS, Session.REDIRECTION_RECOMMEND}
- ]
- if error_messages:
- message = f"{Session.MULTIPLE_ERROR}: {'; '.join(error_messages)}"
- raise RuntimeError(message)
-
- @staticmethod
- def verify_success_with_redirection(status: TSStatus):
- Session.verify_success(status)
- if status.redirectNode is not None:
- raise RedirectException(status.redirectNode)
- return 0
-
- @staticmethod
- def verify_success_with_redirection_for_multi_devices(
- status: TSStatus, devices: list
- ):
- Session.verify_success(status)
- if (
- status.code == Session.MULTIPLE_ERROR
- or status.code == Session.REDIRECTION_RECOMMEND
- ):
- device_to_endpoint = {}
- for i in range(len(status.subStatus)):
- if status.subStatus[i].redirectNode is not None:
- device_to_endpoint[devices[i]] =
status.subStatus[i].redirectNode
- raise RedirectException(device_to_endpoint)
-
def execute_raw_data_query(
self, paths: list, start_time: int, end_time: int
) -> SessionDataSet:
@@ -1780,7 +1733,7 @@ class Session(object):
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg())
from None
- Session.verify_success(resp.status)
+ rpc_utils.verify_success(resp.status)
return SessionDataSet(
"",
resp.columns,
@@ -1825,7 +1778,7 @@ class Session(object):
raise IoTDBConnectionException(e1) from None
else:
raise IoTDBConnectionException(self.connection_error_msg())
from None
- Session.verify_success(resp.status)
+ rpc_utils.verify_success(resp.status)
return SessionDataSet(
"",
resp.columns,
@@ -1871,7 +1824,7 @@ class Session(object):
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
- return Session.verify_success_with_redirection(
+ return rpc_utils.verify_success_with_redirection(
connection.client.insertStringRecordsOfOneDevice(request)
)
except RedirectException as e:
@@ -1880,7 +1833,7 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.insertStringRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
@@ -1906,7 +1859,7 @@ class Session(object):
try:
connection = self.get_connection(device_id)
request.sessionId = connection.session_id
- return Session.verify_success_with_redirection(
+ return rpc_utils.verify_success_with_redirection(
connection.client.insertStringRecordsOfOneDevice(request)
)
except RedirectException as e:
@@ -1915,7 +1868,7 @@ class Session(object):
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.insertStringRecordsOfOneDevice(request)
)
except TTransport.TException as e1:
@@ -2092,12 +2045,12 @@ class Session(object):
self.__session_id, template.get_name(), bytes_array
)
try:
- return
Session.verify_success(self.__client.createSchemaTemplate(request))
+ return
rpc_utils.verify_success(self.__client.createSchemaTemplate(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.createSchemaTemplate(request)
)
except TTransport.TException as e1:
@@ -2117,12 +2070,12 @@ class Session(object):
"""
request = TSDropSchemaTemplateReq(self.__session_id, template_name)
try:
- return
Session.verify_success(self.__client.dropSchemaTemplate(request))
+ return
rpc_utils.verify_success(self.__client.dropSchemaTemplate(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.dropSchemaTemplate(request)
)
except TTransport.TException as e1:
@@ -2164,12 +2117,12 @@ class Session(object):
compressors,
)
try:
- return
Session.verify_success(self.__client.appendSchemaTemplate(request))
+ return
rpc_utils.verify_success(self.__client.appendSchemaTemplate(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.appendSchemaTemplate(request)
)
except TTransport.TException as e1:
@@ -2190,12 +2143,12 @@ class Session(object):
"""
request = TSPruneSchemaTemplateReq(self.__session_id, template_name,
path)
try:
- return
Session.verify_success(self.__client.pruneSchemaTemplate(request))
+ return
rpc_utils.verify_success(self.__client.pruneSchemaTemplate(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.pruneSchemaTemplate(request)
)
except TTransport.TException as e1:
@@ -2216,12 +2169,12 @@ class Session(object):
"""
request = TSSetSchemaTemplateReq(self.__session_id, template_name,
prefix_path)
try:
- return
Session.verify_success(self.__client.setSchemaTemplate(request))
+ return
rpc_utils.verify_success(self.__client.setSchemaTemplate(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.setSchemaTemplate(request)
)
except TTransport.TException as e1:
@@ -2245,12 +2198,12 @@ class Session(object):
self.__session_id, prefix_path, template_name
)
try:
- return
Session.verify_success(self.__client.unsetSchemaTemplate(request))
+ return
rpc_utils.verify_success(self.__client.unsetSchemaTemplate(request))
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
- return Session.verify_success(
+ return rpc_utils.verify_success(
self.__client.unsetSchemaTemplate(request)
)
except TTransport.TException as e1:
@@ -2281,7 +2234,7 @@ class Session(object):
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
- Session.verify_success(response.status)
+ rpc_utils.verify_success(response.status)
return response.count
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
@@ -2307,14 +2260,14 @@ class Session(object):
)
try:
response = self.__client.querySchemaTemplate(request)
- Session.verify_success(response.status)
+ rpc_utils.verify_success(response.status)
return response.result
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
- Session.verify_success(response.status)
+ rpc_utils.verify_success(response.status)
return response.result
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
@@ -2337,14 +2290,14 @@ class Session(object):
)
try:
response = self.__client.querySchemaTemplate(request)
- Session.verify_success(response.status)
+ rpc_utils.verify_success(response.status)
return response.result
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
- Session.verify_success(response.status)
+ rpc_utils.verify_success(response.status)
return response.result
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
@@ -2370,14 +2323,14 @@ class Session(object):
)
try:
response = self.__client.querySchemaTemplate(request)
- Session.verify_success(response.status)
+ rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
- Session.verify_success(response.status)
+ rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
@@ -2400,14 +2353,14 @@ class Session(object):
)
try:
response = self.__client.querySchemaTemplate(request)
- Session.verify_success(response.status)
+ rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
- Session.verify_success(response.status)
+ rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
@@ -2429,14 +2382,14 @@ class Session(object):
)
try:
response = self.__client.querySchemaTemplate(request)
- Session.verify_success(response.status)
+ rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
- Session.verify_success(response.status)
+ rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
@@ -2460,14 +2413,14 @@ class Session(object):
)
try:
response = self.__client.querySchemaTemplate(request)
- Session.verify_success(response.status)
+ rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e:
if self.reconnect():
try:
request.sessionId = self.__session_id
response = self.__client.querySchemaTemplate(request)
- Session.verify_success(response.status)
+ rpc_utils.verify_success(response.status)
return response.measurements
except TTransport.TException as e1:
raise IoTDBConnectionException(e1) from None
@@ -2510,12 +2463,3 @@ class SessionConnection(object):
finally:
if self.transport is not None:
self.transport.close()
-
-
-class RedirectException(Exception):
- def __init__(self, redirect_info):
- Exception.__init__(self)
- if isinstance(redirect_info, TEndPoint):
- self.redirect_node = redirect_info
- else:
- self.device_to_endpoint = redirect_info
diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
index fd0dec79fa7..0c4230381fa 100644
--- a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
+++ b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
@@ -22,11 +22,13 @@ import logging
import numpy as np
import pandas as pd
from thrift.transport import TTransport
+
from iotdb.thrift.rpc.IClientRPCService import TSFetchResultsReq,
TSCloseOperationReq
from iotdb.tsfile.utils.date_utils import parse_int_to_date
from iotdb.tsfile.utils.tsblock_serde import deserialize
-from iotdb.utils.IoTDBConnectionException import IoTDBConnectionException
+from iotdb.utils.exception import IoTDBConnectionException
from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.rpc_utils import verify_success
logger = logging.getLogger("IoTDB")
TIMESTAMP_STR = "Time"
@@ -60,6 +62,7 @@ class IoTDBRpcDataSet(object):
self.__fetch_size = fetch_size
self.column_size = len(column_name_list)
self.__time_out = time_out
+ self.__more_data = more_data
self.__column_name_list = []
self.__column_type_list = []
@@ -112,7 +115,6 @@ class IoTDBRpcDataSet(object):
self.__query_result_index = 0
self.__is_closed = False
self.__empty_resultSet = False
- self.__rows_index = 0
self.has_cached_data_frame = False
self.data_frame = None
@@ -146,7 +148,7 @@ class IoTDBRpcDataSet(object):
return True
if self.__empty_resultSet:
return False
- if self.fetch_results():
+ if self.__more_data and self.fetch_results():
self.construct_one_data_frame()
return True
return False
@@ -254,7 +256,7 @@ class IoTDBRpcDataSet(object):
return True
if self.__empty_resultSet:
return False
- if self.fetch_results():
+ if self.__more_data and self.fetch_results():
return True
return False
@@ -405,7 +407,6 @@ class IoTDBRpcDataSet(object):
def fetch_results(self):
if self.__is_closed:
raise IoTDBConnectionException("This DataSet is already closed")
- self.__rows_index = 0
request = TSFetchResultsReq(
self.__session_id,
self.__sql,
@@ -413,9 +414,12 @@ class IoTDBRpcDataSet(object):
self.__query_id,
True,
self.__time_out,
+ self.__statement_id,
)
try:
resp = self.__client.fetchResultsV2(request)
+ verify_success(resp.status)
+ self.__more_data = resp.moreData
if not resp.hasResultSet:
self.__empty_resultSet = True
else:
@@ -423,7 +427,7 @@ class IoTDBRpcDataSet(object):
self.__query_result_index = 0
return resp.hasResultSet
except TTransport.TException as e:
- raise RuntimeError(
+ raise IoTDBConnectionException(
"Cannot fetch result from server, because of network
connection: ", e
)
diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBConnectionException.py
b/iotdb-client/client-py/iotdb/utils/exception.py
similarity index 64%
rename from iotdb-client/client-py/iotdb/utils/IoTDBConnectionException.py
rename to iotdb-client/client-py/iotdb/utils/exception.py
index 0f7b895d1f8..2e233c6e7b0 100644
--- a/iotdb-client/client-py/iotdb/utils/IoTDBConnectionException.py
+++ b/iotdb-client/client-py/iotdb/utils/exception.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
#
+from iotdb.thrift.common.ttypes import TEndPoint, TSStatus
class IoTDBConnectionException(Exception):
@@ -27,3 +28,22 @@ class IoTDBConnectionException(Exception):
super().__init__(message, cause)
else:
super().__init__()
+
+
+class StatementExecutionException(Exception):
+ def __init__(self, status: TSStatus = None, message=None):
+ if status is not None:
+ super().__init__(f"{status.code}: {status.message}")
+ elif message is not None:
+ super().__init__(message)
+ else:
+ super().__init__()
+
+
+class RedirectException(Exception):
+ def __init__(self, redirect_info):
+ Exception.__init__(self)
+ if isinstance(redirect_info, TEndPoint):
+ self.redirect_node = redirect_info
+ else:
+ self.device_to_endpoint = redirect_info
diff --git a/iotdb-client/client-py/iotdb/utils/rpc_utils.py
b/iotdb-client/client-py/iotdb/utils/rpc_utils.py
new file mode 100644
index 00000000000..6ceb39c6558
--- /dev/null
+++ b/iotdb-client/client-py/iotdb/utils/rpc_utils.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from iotdb.thrift.common.ttypes import TSStatus
+from iotdb.utils.exception import RedirectException,
StatementExecutionException
+
+SUCCESS_STATUS = 200
+MULTIPLE_ERROR = 302
+REDIRECTION_RECOMMEND = 400
+
+
+def verify_success(status: TSStatus):
+ """
+ verify success of operation
+ :param status: execution result status
+ """
+ if status.code == MULTIPLE_ERROR:
+ verify_success_by_list(status.subStatus)
+ return 0
+ if status.code == SUCCESS_STATUS or status.code == REDIRECTION_RECOMMEND:
+ return 0
+
+ raise StatementExecutionException(status)
+
+
+def verify_success_by_list(status_list: list):
+ """
+ verify success of operation
+ :param status_list: execution result status
+ """
+ error_messages = [
+ status.message
+ for status in status_list
+ if status.code not in {SUCCESS_STATUS, REDIRECTION_RECOMMEND}
+ ]
+ if error_messages:
+ message = f"{MULTIPLE_ERROR}: {'; '.join(error_messages)}"
+ raise StatementExecutionException(message=message)
+
+
+def verify_success_with_redirection(status: TSStatus):
+ verify_success(status)
+ if status.redirectNode is not None:
+ raise RedirectException(status.redirectNode)
+ return 0
+
+
+def verify_success_with_redirection_for_multi_devices(status: TSStatus,
devices: list):
+ verify_success(status)
+ if status.code == MULTIPLE_ERROR or status.code == REDIRECTION_RECOMMEND:
+ device_to_endpoint = {}
+ for i in range(len(status.subStatus)):
+ if status.subStatus[i].redirectNode is not None:
+ device_to_endpoint[devices[i]] =
status.subStatus[i].redirectNode
+ raise RedirectException(device_to_endpoint)