This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch multi_node_13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aa08d878a06d449a542abce3ca0613a54cb19170 Author: Haonan <[email protected]> AuthorDate: Wed Mar 22 09:51:58 2023 +0800 [IOTDB-5711] Support connecting multiple nodes in Python API (#9400) --- client-py/SessionExample.py | 10 +- client-py/iotdb/Session.py | 812 ++++++++++++++++----- client-py/tests/test_dataframe.py | 5 +- client-py/tests/test_delete_data.py | 9 +- .../UserGuide/API/Programming-Python-Native-API.md | 21 +- .../UserGuide/API/Programming-Python-Native-API.md | 21 +- 6 files changed, 672 insertions(+), 206 deletions(-) diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py index bbc9669527..6dda1c49e0 100644 --- a/client-py/SessionExample.py +++ b/client-py/SessionExample.py @@ -29,7 +29,14 @@ ip = "127.0.0.1" port_ = "6667" username_ = "root" password_ = "root" -session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") +# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") +session = Session.init_from_node_urls( + node_urls=["127.0.0.1:6667", "127.0.0.1:6668"], + user="root", + password="root", + fetch_size=1024, + zone_id="UTC+8", +) session.open(False) # set and delete storage groups @@ -215,6 +222,7 @@ np_tablet_unsorted = NumpyTablet( np_values_unsorted, np_timestamps_unsorted, ) + session.insert_tablet(np_tablet_unsorted) print(np_tablet_unsorted.get_timestamps()) for value in np_tablet_unsorted.get_values(): diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py index 145fd94370..cf6e8ebc22 100644 --- a/client-py/iotdb/Session.py +++ b/client-py/iotdb/Session.py @@ -16,14 +16,21 @@ # under the License. # import logging +import random import struct import time -from iotdb.utils.SessionDataSet import SessionDataSet from thrift.protocol import TBinaryProtocol, TCompactProtocol from thrift.transport import TSocket, TTransport +from iotdb.utils.SessionDataSet import SessionDataSet +from .thrift.rpc.ttypes import ( + EndPoint, + TSRawDataQueryReq, + TSLastDataQueryReq, + TSInsertStringRecordsOfOneDeviceReq, +) from .thrift.rpc.TSIService import ( Client, TSCreateTimeseriesReq, @@ -34,7 +41,6 @@ from .thrift.rpc.TSIService import ( TSExecuteStatementReq, TSOpenSessionReq, TSCreateMultiTimeseriesReq, - TSCreateSchemaTemplateReq, TSCloseSessionReq, TSInsertTabletsReq, TSInsertRecordsReq, @@ -65,6 +71,7 @@ class Session(object): DEFAULT_USER = "root" DEFAULT_PASSWORD = "root" DEFAULT_ZONE_ID = time.strftime("%z") + RETRY_NUM = 3 def __init__( self, @@ -77,6 +84,9 @@ class Session(object): ): self.__host = host self.__port = port + self.__hosts = None + self.__ports = None + self.__default_endpoint = EndPoint(self.__host, self.__port) self.__user = user self.__password = password self.__fetch_size = fetch_size @@ -87,21 +97,60 @@ class Session(object): self.__session_id = None self.__statement_id = None self.__zone_id = zone_id + self.__enable_rpc_compression = None + + @classmethod + def init_from_node_urls( + cls, + node_urls, + user=DEFAULT_USER, + password=DEFAULT_PASSWORD, + fetch_size=DEFAULT_FETCH_SIZE, + zone_id=DEFAULT_ZONE_ID, + ): + if node_urls is None: + raise RuntimeError("node urls is empty") + session = Session(None, None, user, password, fetch_size, zone_id) + session.__hosts = [] + session.__ports = [] + for node_url in node_urls: + split = node_url.split(":") + session.__hosts.append(split[0]) + session.__ports.append(split[1]) + session.__host = session.__hosts[0] + session.__port = session.__ports[0] + session.__default_endpoint = EndPoint(session.__host, session.__port) + return session def open(self, enable_rpc_compression): if not self.__is_close: return + self.__enable_rpc_compression = enable_rpc_compression + if self.__hosts is None: + self.init_connection(self.__default_endpoint) + else: + for i in range(0, len(self.__hosts)): + self.__default_endpoint = EndPoint(self.__hosts[i], self.__ports[i]) + try: + self.init_connection(self.__default_endpoint) + except Exception as e: + if not self.reconnect(): + logger.error("Cluster has no nodes to connect") + raise e + break + + def init_connection(self, endpoint): self.__transport = TTransport.TFramedTransport( - TSocket.TSocket(self.__host, self.__port) + TSocket.TSocket(endpoint.ip, endpoint.port) ) if not self.__transport.isOpen(): try: self.__transport.open() except TTransport.TTransportException as e: - logger.exception("TTransportException!", exc_info=e) + raise e - if enable_rpc_compression: + if self.__enable_rpc_compression: self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport)) else: self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport)) @@ -133,6 +182,7 @@ class Session(object): except Exception as e: self.__transport.close() logger.exception("session closed because: ", exc_info=e) + raise e if self.__zone_id is not None: self.set_time_zone(self.__zone_id) @@ -165,12 +215,21 @@ class Session(object): set one storage group :param group_name: String, storage group name (starts from root) """ - status = self.__client.setStorageGroup(self.__session_id, group_name) - logger.debug( - "setting storage group {} message: {}".format(group_name, status.message) - ) - - return Session.verify_success(status) + try: + return Session.verify_success( + self.__client.setStorageGroup(self.__session_id, group_name) + ) + except TTransport.TException as e: + if self.reconnect(): + try: + return Session.verify_success( + self.__client.setStorageGroup(self.__session_id, group_name) + ) + except TTransport.TException as e1: + logger.exception("create databases fails because: ", e1) + raise e1 + else: + raise e def delete_storage_group(self, storage_group): """ @@ -185,14 +244,23 @@ class Session(object): delete multiple storage groups. :param storage_group_lst: List, paths of the target storage groups. """ - status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst) - logger.debug( - "delete storage group(s) {} message: {}".format( - storage_group_lst, status.message + try: + return Session.verify_success( + self.__client.deleteStorageGroups(self.__session_id, storage_group_lst) ) - ) - - return Session.verify_success(status) + except TTransport.TException as e: + if self.reconnect(): + try: + return Session.verify_success( + self.__client.deleteStorageGroups( + self.__session_id, storage_group_lst + ) + ) + except TTransport.TException as e1: + logger.exception("delete database fails because: ", e1) + raise e1 + else: + raise e def create_time_series( self, @@ -230,12 +298,20 @@ class Session(object): attributes, alias, ) - status = self.__client.createTimeseries(request) - logger.debug( - "creating time series {} message: {}".format(ts_path, status.message) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.createTimeseries(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.createTimeseries(request) + ) + except TTransport.TException as e1: + logger.exception("creating time series fails because: ", e1) + raise e1 + else: + raise e def create_aligned_time_series( self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst @@ -260,14 +336,21 @@ class Session(object): encoding_lst, compressor_lst, ) - status = self.__client.createAlignedTimeseries(request) - logger.debug( - "creating aligned time series of device {} message: {}".format( - measurements_lst, status.message + try: + return Session.verify_success( + self.__client.createAlignedTimeseries(request) ) - ) - - return Session.verify_success(status) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.createAlignedTimeseries(request) + ) + except TTransport.TException as e1: + logger.exception("creating time series fails because: ", e1) + raise e1 + raise e def create_multi_time_series( self, @@ -306,28 +389,39 @@ class Session(object): attributes_lst, alias_lst, ) - status = self.__client.createMultiTimeseries(request) - logger.debug( - "creating multiple time series {} message: {}".format( - ts_path_lst, status.message - ) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.createMultiTimeseries(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.createMultiTimeseries(request) + ) + except TTransport.TException as e1: + logger.exception("creating multi time series fails because: ", e1) + raise e1 + else: + raise e def delete_time_series(self, paths_list): """ delete multiple time series, including data and schema :param paths_list: List of time series path, which should be complete (starts from root) """ - status = self.__client.deleteTimeseries(self.__session_id, paths_list) - logger.debug( - "deleting multiple time series {} message: {}".format( - paths_list, status.message + try: + return Session.verify_success( + self.__client.deleteTimeseries(self.__session_id, paths_list) ) - ) - - return Session.verify_success(status) + except TTransport.TException: + if self.reconnect(): + try: + return Session.verify_success( + self.__client.deleteTimeseries(self.__session_id, paths_list) + ) + except TTransport.TException as e1: + logger.exception("deleting time series fails because: ", e1) + raise e1 def check_time_series_exists(self, path): """ @@ -346,14 +440,21 @@ class Session(object): :param paths_list: time series list that the data in. :param end_time: data with time stamp less than or equal to time will be deleted. """ - request = TSDeleteDataReq(self.__session_id, paths_list, -9223372036854775808, end_time) + request = TSDeleteDataReq( + self.__session_id, paths_list, -9223372036854775808, end_time + ) try: - status = self.__client.deleteData(request) - logger.debug( - "delete data from {}, message: {}".format(paths_list, status.message) - ) + return Session.verify_success(self.__client.deleteData(request)) except TTransport.TException as e: - logger.exception("data deletion fails because: ", e) + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.deleteData(request)) + except TTransport.TException as e1: + logger.exception("data deletion fails because: ", e1) + raise e1 + else: + raise e def delete_data_in_range(self, paths_list, start_time, end_time): """ @@ -364,12 +465,17 @@ class Session(object): """ request = TSDeleteDataReq(self.__session_id, paths_list, start_time, end_time) try: - status = self.__client.deleteData(request) - logger.debug( - "delete data from {}, message: {}".format(paths_list, status.message) - ) + return Session.verify_success(self.__client.deleteData(request)) except TTransport.TException as e: - logger.exception("data deletion fails because: ", e) + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.deleteData(request)) + except TTransport.TException as e1: + logger.exception("data deletion fails because: ", e1) + raise e1 + else: + raise e def insert_str_record(self, device_id, timestamp, measurements, string_values): """special case for inserting one row of String (TEXT) value""" @@ -377,18 +483,23 @@ class Session(object): string_values = [string_values] if type(measurements) == str: measurements = [measurements] - data_types = [TSDataType.TEXT.value for _ in string_values] request = self.gen_insert_str_record_req( - device_id, timestamp, measurements, data_types, string_values - ) - status = self.__client.insertStringRecord(request) - logger.debug( - "insert one record to device {} message: {}".format( - device_id, status.message - ) + device_id, timestamp, measurements, string_values ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.insertStringRecord(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.insertStringRecord(request) + ) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_aligned_str_record( self, device_id, timestamp, measurements, string_values @@ -398,18 +509,23 @@ class Session(object): string_values = [string_values] if type(measurements) == str: measurements = [measurements] - data_types = [TSDataType.TEXT.value for _ in string_values] request = self.gen_insert_str_record_req( - device_id, timestamp, measurements, data_types, string_values, True - ) - status = self.__client.insertStringRecord(request) - logger.debug( - "insert one record to device {} message: {}".format( - device_id, status.message - ) + device_id, timestamp, measurements, string_values, True ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.insertStringRecord(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.insertStringRecord(request) + ) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_record(self, device_id, timestamp, measurements, data_types, values): """ @@ -427,14 +543,18 @@ class Session(object): request = self.gen_insert_record_req( device_id, timestamp, measurements, data_types, values ) - status = self.__client.insertRecord(request) - logger.debug( - "insert one record to device {} message: {}".format( - device_id, status.message - ) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.insertRecord(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertRecord(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_records( self, device_ids, times, measurements_lst, types_lst, values_lst @@ -455,14 +575,18 @@ class Session(object): request = self.gen_insert_records_req( device_ids, times, measurements_lst, type_values_lst, values_lst ) - status = self.__client.insertRecords(request) - logger.debug( - "insert multiple records to devices {} message: {}".format( - device_ids, status.message - ) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.insertRecords(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertRecords(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_aligned_record( self, device_id, timestamp, measurements, data_types, values @@ -482,14 +606,18 @@ class Session(object): request = self.gen_insert_record_req( device_id, timestamp, measurements, data_types, values, True ) - status = self.__client.insertRecord(request) - logger.debug( - "insert one record to device {} message: {}".format( - device_id, status.message - ) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.insertRecord(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertRecord(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_aligned_records( self, device_ids, times, measurements_lst, types_lst, values_lst @@ -510,14 +638,18 @@ class Session(object): request = self.gen_insert_records_req( device_ids, times, measurements_lst, type_values_lst, values_lst, True ) - status = self.__client.insertRecords(request) - logger.debug( - "insert multiple records to devices {} message: {}".format( - device_ids, status.message - ) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.insertRecords(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertRecords(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def test_insert_record( self, device_id, timestamp, measurements, data_types, values @@ -535,14 +667,19 @@ class Session(object): request = self.gen_insert_record_req( device_id, timestamp, measurements, data_types, values ) - status = self.__client.testInsertRecord(request) - logger.debug( - "testing! insert one record to device {} message: {}".format( - device_id, status.message - ) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.testInsertRecord(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + return Session.verify_success( + self.__client.testInsertRecord(request) + ) + except TTransport.TException as e1: + logger.exception("test insert fails because: ", e1) + raise e1 + else: + raise e def test_insert_records( self, device_ids, times, measurements_lst, types_lst, values_lst @@ -563,12 +700,19 @@ class Session(object): request = self.gen_insert_records_req( device_ids, times, measurements_lst, type_values_lst, values_lst ) - status = self.__client.testInsertRecords(request) - logger.debug( - "testing! insert multiple records, message: {}".format(status.message) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.testInsertRecords(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + return Session.verify_success( + self.__client.testInsertRecords(request) + ) + except TTransport.TException as e1: + logger.exception("test insert fails because: ", e1) + raise e1 + else: + raise e def gen_insert_record_req( self, device_id, timestamp, measurements, data_types, values, is_aligned=False @@ -588,11 +732,11 @@ class Session(object): ) def gen_insert_str_record_req( - self, device_id, timestamp, measurements, data_types, values, is_aligned=False + self, device_id, timestamp, measurements, values, is_aligned=False ): - if (len(values) != len(data_types)) or (len(values) != len(measurements)): + if len(values) != len(measurements): raise RuntimeError( - "length of data types does not equal to length of values!" + "length of measurements does not equal to length of values!" ) return TSInsertStringRecordReq( self.__session_id, device_id, measurements, values, timestamp, is_aligned @@ -649,24 +793,38 @@ class Session(object): The tablet itself is sorted (see docs of Tablet.py) :param tablet: a tablet specified above """ - status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet)) - logger.debug( - "insert one tablet to device {} message: {}".format( - tablet.get_device_id(), status.message - ) - ) - - return Session.verify_success(status) + request = self.gen_insert_tablet_req(tablet) + try: + return Session.verify_success(self.__client.insertTablet(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertTablet(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_tablets(self, tablet_lst): """ insert multiple tablets, tablets are independent to each other :param tablet_lst: List of tablets """ - status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst)) - logger.debug("insert multiple tablets, message: {}".format(status.message)) - - return Session.verify_success(status) + request = self.gen_insert_tablets_req(tablet_lst) + try: + return Session.verify_success(self.__client.insertTablets(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertTablets(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_aligned_tablet(self, tablet): """ @@ -680,26 +838,38 @@ class Session(object): The tablet itself is sorted (see docs of Tablet.py) :param tablet: a tablet specified above """ - status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet, True)) - logger.debug( - "insert one tablet to device {} message: {}".format( - tablet.get_device_id(), status.message - ) - ) - - return Session.verify_success(status) + request = self.gen_insert_tablet_req(tablet, True) + try: + return Session.verify_success(self.__client.insertTablet(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertTablet(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_aligned_tablets(self, tablet_lst): """ insert multiple aligned tablets, tablets are independent to each other :param tablet_lst: List of tablets """ - status = self.__client.insertTablets( - self.gen_insert_tablets_req(tablet_lst, True) - ) - logger.debug("insert multiple tablets, message: {}".format(status.message)) - - return Session.verify_success(status) + request = self.gen_insert_tablets_req(tablet_lst, True) + try: + return Session.verify_success(self.__client.insertTablets(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertTablets(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_records_of_one_device( self, device_id, times_list, measurements_list, types_list, values_list @@ -752,12 +922,22 @@ class Session(object): request = self.gen_insert_records_of_one_device_request( device_id, times_list, measurements_list, values_list, types_list ) - - # send request - status = self.__client.insertRecordsOfOneDevice(request) - logger.debug("insert records of one device, message: {}".format(status.message)) - - return Session.verify_success(status) + try: + return Session.verify_success( + self.__client.insertRecordsOfOneDevice(request) + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.insertRecordsOfOneDevice(request) + ) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_aligned_records_of_one_device( self, device_id, times_list, measurements_list, types_list, values_list @@ -811,10 +991,22 @@ class Session(object): ) # send request - status = self.__client.insertRecordsOfOneDevice(request) - logger.debug("insert records of one device, message: {}".format(status.message)) - - return Session.verify_success(status) + try: + return Session.verify_success( + self.__client.insertRecordsOfOneDevice(request) + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.insertRecordsOfOneDevice(request) + ) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def gen_insert_records_of_one_device_request( self, @@ -852,14 +1044,21 @@ class Session(object): should be used to test other time cost in client :param tablet: a tablet of data """ - status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet)) - logger.debug( - "testing! insert one tablet to device {} message: {}".format( - tablet.get_device_id(), status.message - ) - ) - - return Session.verify_success(status) + try: + request = self.gen_insert_tablet_req(tablet) + return Session.verify_success(self.__client.testInsertTablet(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.testInsertTablet(request) + ) + except TTransport.TException as e1: + logger.exception("test insert fails because: ", e1) + raise e1 + else: + raise e def test_insert_tablets(self, tablet_list): """ @@ -867,14 +1066,20 @@ class Session(object): should be used to test other time cost in client :param tablet_list: List of tablets """ - status = self.__client.testInsertTablets( - self.gen_insert_tablets_req(tablet_list) - ) - logger.debug( - "testing! insert multiple tablets, message: {}".format(status.message) - ) - - return Session.verify_success(status) + try: + request = self.gen_insert_tablets_req(tablet_list) + return Session.verify_success(self.__client.testInsertTablets(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + return Session.verify_success( + self.__client.testInsertTablets(request) + ) + except TTransport.TException as e1: + logger.exception("test insert fails because: ", e1) + raise e1 + else: + raise e def gen_insert_tablet_req(self, tablet, is_aligned=False): data_type_values = [data_type.value for data_type in tablet.get_data_types()] @@ -926,19 +1131,43 @@ class Session(object): request = TSExecuteStatementReq( self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout ) - resp = self.__client.executeQueryStatement(request) - return SessionDataSet( - sql, - resp.columns, - resp.dataTypeList, - resp.columnNameIndexMap, - resp.queryId, - self.__client, - self.__statement_id, - self.__session_id, - resp.queryDataSet, - resp.ignoreTimeStamp, - ) + try: + resp = self.__client.executeQueryStatement(request) + return SessionDataSet( + sql, + resp.columns, + resp.dataTypeList, + resp.columnNameIndexMap, + resp.queryId, + self.__client, + self.__statement_id, + self.__session_id, + resp.queryDataSet, + resp.ignoreTimeStamp, + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + request.statementId = self.__statement_id + resp = self.__client.executeQueryStatement(request) + return SessionDataSet( + sql, + resp.columns, + resp.dataTypeList, + resp.columnNameIndexMap, + resp.queryId, + self.__client, + self.__statement_id, + self.__session_id, + resp.queryDataSet, + resp.ignoreTimeStamp, + ) + except TTransport.TException as e1: + logger.exception("execution of query statement fails because: ", e1) + raise e1 + else: + raise e def execute_non_query_statement(self, sql): """ @@ -949,12 +1178,22 @@ class Session(object): try: resp = self.__client.executeUpdateStatement(request) status = resp.status - logger.debug( - "execute non-query statement {} message: {}".format(sql, status.message) - ) return Session.verify_success(status) except TTransport.TException as e: - raise RuntimeError("execution of non-query statement fails because: ", e) + if self.reconnect(): + try: + request.sessionId = self.__session_id + request.statementId = self.__statement_id + resp = self.__client.executeUpdateStatement(request) + status = resp.status + return Session.verify_success(status) + except TTransport.TException as e1: + logger.exception( + "execution of non-query statement fails because: ", e1 + ) + raise e1 + else: + raise e @staticmethod def value_to_bytes(data_types, values): @@ -1075,7 +1314,32 @@ class Session(object): statementId=self.__statement_id, enableRedirectQuery=False, ) - resp = self.__client.executeRawDataQuery(request) + try: + resp = self.__client.executeRawDataQuery(request) + return SessionDataSet( + "", + resp.columns, + resp.dataTypeList, + resp.columnNameIndexMap, + resp.queryId, + self.__client, + self.__statement_id, + self.__session_id, + resp.queryDataSet, + resp.ignoreTimeStamp, + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + request.statementId = self.__statement_id + resp = self.__client.executeRawDataQuery(request) + except TTransport.TException as e1: + logger.exception("execution of query statement fails because: ", e1) + raise e1 + else: + raise e + Session.verify_success(resp.status) return SessionDataSet( "", resp.columns, @@ -1104,8 +1368,32 @@ class Session(object): self.__statement_id, enableRedirectQuery=False, ) - - resp = self.__client.executeLastDataQuery(request) + try: + resp = self.__client.executeLastDataQuery(request) + return SessionDataSet( + "", + resp.columns, + resp.dataTypeList, + resp.columnNameIndexMap, + resp.queryId, + self.__client, + self.__statement_id, + self.__session_id, + resp.queryDataSet, + resp.ignoreTimeStamp, + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + request.statementId = self.__statement_id + resp = self.__client.executeLastDataQuery(request) + except TTransport.TException as e1: + logger.exception("execution of query statement fails because: ", e1) + raise e1 + else: + raise e + Session.verify_success(resp.status) return SessionDataSet( "", resp.columns, @@ -1118,3 +1406,131 @@ class Session(object): resp.queryDataSet, resp.ignoreTimeStamp, ) + + def insert_string_records_of_one_device( + self, + device_id: str, + times: list, + measurements_list: list, + values_list: list, + have_sorted: bool = False, + ): + """ + insert multiple row of string record into database: + timestamp, m1, m2, m3 + 0, text1, text2, text3 + :param device_id: String, device id + :param times: Timestamp list + :param measurements_list: Measurements list + :param values_list: Value list + :param have_sorted: have these list been sorted by timestamp + """ + if (len(times) != len(measurements_list)) or (len(times) != len(values_list)): + raise RuntimeError( + "insert records of one device error: times, measurementsList and valuesList's size should be equal!" + ) + request = self.gen_insert_string_records_of_one_device_request( + device_id, times, measurements_list, values_list, have_sorted, False + ) + try: + return Session.verify_success( + self.__client.insertStringRecordsOfOneDevice(request) + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.insertStringRecordsOfOneDevice(request) + ) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e + + def insert_aligned_string_records_of_one_device( + self, + device_id: str, + times: list, + measurements_list: list, + values: list, + have_sorted: bool = False, + ): + if (len(times) != len(measurements_list)) or (len(times) != len(values)): + raise RuntimeError( + "insert records of one device error: times, measurementsList and valuesList's size should be equal!" + ) + request = self.gen_insert_string_records_of_one_device_request( + device_id, times, measurements_list, values, have_sorted, True + ) + try: + return Session.verify_success( + self.__client.insertStringRecordsOfOneDevice(request) + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.insertStringRecordsOfOneDevice(request) + ) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e + + def reconnect(self): + if self.__hosts is None: + return False + connected = False + for i in range(1, self.RETRY_NUM + 1): + if self.__transport is not None: + self.__transport.close() + curr_host_index = random.randint(0, len(self.__hosts)) + try_host_num = 0 + for j in range(curr_host_index, len(self.__hosts)): + if try_host_num == len(self.__hosts): + break + self.__default_endpoint = EndPoint(self.__hosts[j], self.__ports[j]) + if j == len(self.__hosts) - 1: + j = -1 + try_host_num += 1 + try: + self.init_connection(self.__default_endpoint) + connected = True + except TTransport.TException as e: + continue + break + if connected: + break + return connected + + def gen_insert_string_records_of_one_device_request( + self, + device_id, + times, + measurements_list, + values_list, + have_sorted, + is_aligned=False, + ): + if (len(times) != len(measurements_list)) or (len(times) != len(values_list)): + raise RuntimeError( + "insert records of one device error: times, measurementsList and valuesList's size should be equal!" + ) + if not Session.check_sorted(times): + # sort by timestamp + sorted_zipped = sorted(zip(times, measurements_list, values_list)) + result = zip(*sorted_zipped) + times_list, measurements_list, values_list = [list(x) for x in result] + request = TSInsertStringRecordsOfOneDeviceReq( + self.__session_id, + device_id, + measurements_list, + values_list, + times, + is_aligned, + ) + return request diff --git a/client-py/tests/test_dataframe.py b/client-py/tests/test_dataframe.py index f957c2da42..05aa8eb141 100644 --- a/client-py/tests/test_dataframe.py +++ b/client-py/tests/test_dataframe.py @@ -67,11 +67,12 @@ def test_non_time_query(): "tags", "attributes", "deadband", - "deadband parameters" + "deadband parameters", ] assert_array_equal( df.values, - [[ + [ + [ "root.device1.pressure", None, "root.device1", diff --git a/client-py/tests/test_delete_data.py b/client-py/tests/test_delete_data.py index 7e9df9ac7e..b0a9f014c2 100644 --- a/client-py/tests/test_delete_data.py +++ b/client-py/tests/test_delete_data.py @@ -114,7 +114,9 @@ def test_delete_date(): print_message("insert aligned record of one device failed") # execute delete data - session.delete_data(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1) + session.delete_data( + ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1 + ) # execute raw data query sql statement session_data_set = session.execute_query_statement( @@ -140,7 +142,9 @@ def test_delete_date(): assert actual_count == expect_count # execute delete data - session.delete_data_in_range(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3) + session.delete_data_in_range( + ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3 + ) # execute raw data query sql statement session_data_set = session.execute_query_statement( @@ -175,4 +179,3 @@ else: print("Some test failed, please have a check") print("failed count: ", failed_count) exit(1) - diff --git a/docs/UserGuide/API/Programming-Python-Native-API.md b/docs/UserGuide/API/Programming-Python-Native-API.md index 4c4ee5e9c6..99ac94cfe6 100644 --- a/docs/UserGuide/API/Programming-Python-Native-API.md +++ b/docs/UserGuide/API/Programming-Python-Native-API.md @@ -59,7 +59,26 @@ session.close() * Initialize a Session ```python -session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") +session = Session( + ip="127.0.0.1", + port="6667", + user="root", + password="root", + fetch_size=1024, + zone_id="UTC+8" +) +``` + +* Initialize a Session to connect multiple nodes + +```python +session = Session.init_from_node_urls( + node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"], + user="root", + password="root", + fetch_size=1024, + zone_id="UTC+8" +) ``` * Open a session, with a parameter to specify whether to enable RPC compression diff --git a/docs/zh/UserGuide/API/Programming-Python-Native-API.md b/docs/zh/UserGuide/API/Programming-Python-Native-API.md index 5038e023b1..7df7994abf 100644 --- a/docs/zh/UserGuide/API/Programming-Python-Native-API.md +++ b/docs/zh/UserGuide/API/Programming-Python-Native-API.md @@ -60,7 +60,26 @@ session.close() * 初始化 Session ```python -session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") +session = Session( + ip="127.0.0.1", + port="6667", + user="root", + password="root", + fetch_size=1024, + zone_id="UTC+8" +) +``` + +* 初始化可连接多节点的 Session + +```python +session = Session.init_from_node_urls( + node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"], + user="root", + password="root", + fetch_size=1024, + zone_id="UTC+8" +) ``` * 开启 Session,并决定是否开启 RPC 压缩
