This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch py_reconnect in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0ed95d9efe8dfff8fd7768ca13ec31d9849a20e1 Author: HTHou <[email protected]> AuthorDate: Tue Mar 21 18:24:39 2023 +0800 Support connecting multiple nodes in python api --- client-py/SessionExample.py | 7 +- client-py/iotdb/Session.py | 800 ++++++++++++++++------- client-py/iotdb/utils/IoTDBRpcDataSet.py | 6 +- client-py/iotdb/utils/NumpyTablet.py | 4 +- client-py/tests/tablet_performance_comparison.py | 4 +- client-py/tests/test_dataframe.py | 6 +- client-py/tests/test_delete_data.py | 8 +- client-py/tests/test_numpy_tablet.py | 7 +- client-py/tests/test_session.py | 7 +- client-py/tests/test_template.py | 12 +- 10 files changed, 608 insertions(+), 253 deletions(-) diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py index 1323bf4ef1..5e24e3c41d 100644 --- a/client-py/SessionExample.py +++ b/client-py/SessionExample.py @@ -239,7 +239,12 @@ np_bitmaps_[2].mark(2) np_bitmaps_[4].mark(3) np_bitmaps_[5].mark(3) np_tablet_with_none = NumpyTablet( - "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_ + "root.sg_test_01.d_02", + measurements_, + data_types_, + np_values_, + np_timestamps_, + np_bitmaps_, ) session.insert_tablet(np_tablet_with_none) diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py index 1a4c41326d..56359ae3fd 100644 --- a/client-py/iotdb/Session.py +++ b/client-py/iotdb/Session.py @@ -16,6 +16,7 @@ # under the License. # import logging +import random import struct import time from thrift.protocol import TBinaryProtocol, TCompactProtocol @@ -24,6 +25,7 @@ from thrift.transport import TSocket, TTransport from iotdb.utils.SessionDataSet import SessionDataSet from .template.Template import Template from .template.TemplateQueryType import TemplateQueryType +from .thrift.common.ttypes import TEndPoint from .thrift.rpc.IClientRPCService import ( Client, TSCreateTimeseriesReq, @@ -54,6 +56,7 @@ from .thrift.rpc.ttypes import ( TSLastDataQueryReq, TSInsertStringRecordsOfOneDeviceReq, ) + # for debug # from IoTDBConstants import * # from SessionDataSet import SessionDataSet @@ -72,23 +75,34 @@ logger = logging.getLogger("IoTDB") class Session(object): SUCCESS_STATUS = 200 + MULTIPLE_ERROR = 302 REDIRECTION_RECOMMEND = 400 DEFAULT_FETCH_SIZE = 10000 DEFAULT_USER = "root" DEFAULT_PASSWORD = "root" DEFAULT_ZONE_ID = time.strftime("%z") + RETRY_NUM = 3 def __init__( self, - host, - port, + hosts, + ports, user=DEFAULT_USER, password=DEFAULT_PASSWORD, fetch_size=DEFAULT_FETCH_SIZE, zone_id=DEFAULT_ZONE_ID, ): - self.__host = host - self.__port = port + if isinstance(hosts, list): + self.__hosts = hosts + self.__host = hosts[0] + else: + self.__host = hosts + if isinstance(ports, list): + self.__ports = ports + self.__port = ports[0] + else: + self.__port = ports + self.__default_endpoint = TEndPoint(self.__host, self.__port) self.__user = user self.__password = password self.__fetch_size = fetch_size @@ -99,12 +113,17 @@ class Session(object): self.__session_id = None self.__statement_id = None self.__zone_id = zone_id + self.__enable_rpc_compression = None def open(self, enable_rpc_compression=False): + self.__enable_rpc_compression = enable_rpc_compression + self.init(self.__default_endpoint) + + def init(self, endpoint): if not self.__is_close: return self.__transport = TTransport.TFramedTransport( - TSocket.TSocket(self.__host, self.__port) + TSocket.TSocket(endpoint.ip, endpoint.port) ) if not self.__transport.isOpen(): @@ -112,8 +131,9 @@ class Session(object): self.__transport.open() except TTransport.TTransportException as e: logger.exception("TTransportException!", exc_info=e) + raise e - if enable_rpc_compression: + if self.__enable_rpc_compression: self.__client = Client(TCompactProtocol.TCompactProtocol(self.__transport)) else: self.__client = Client(TBinaryProtocol.TBinaryProtocol(self.__transport)) @@ -145,6 +165,7 @@ class Session(object): except Exception as e: self.__transport.close() logger.exception("session closed because: ", exc_info=e) + raise e if self.__zone_id is not None: self.set_time_zone(self.__zone_id) @@ -177,12 +198,21 @@ class Session(object): create one database :param group_name: String, database name (starts from root) """ - status = self.__client.setStorageGroup(self.__session_id, group_name) - logger.debug( - "setting database {} message: {}".format(group_name, status.message) - ) - - return Session.verify_success(status) + try: + return Session.verify_success( + self.__client.setStorageGroup(self.__session_id, group_name) + ) + except TTransport.TException as e: + if self.reconnect(): + try: + return Session.verify_success( + self.__client.setStorageGroup(self.__session_id, group_name) + ) + except TTransport.TException as e1: + logger.exception("create databases fails because: ", e1) + raise e1 + else: + raise e def delete_storage_group(self, storage_group): """ @@ -197,14 +227,23 @@ class Session(object): delete multiple databases. :param storage_group_lst: List, paths of the target databases. """ - status = self.__client.deleteStorageGroups(self.__session_id, storage_group_lst) - logger.debug( - "delete database(s) {} message: {}".format( - storage_group_lst, status.message + try: + return Session.verify_success( + self.__client.deleteStorageGroups(self.__session_id, storage_group_lst) ) - ) - - return Session.verify_success(status) + except TTransport.TException as e: + if self.reconnect(): + try: + return Session.verify_success( + self.__client.deleteStorageGroups( + self.__session_id, storage_group_lst + ) + ) + except TTransport.TException as e1: + logger.exception("delete database fails because: ", e1) + raise e1 + else: + raise e def create_time_series( self, @@ -242,12 +281,20 @@ class Session(object): attributes, alias, ) - status = self.__client.createTimeseries(request) - logger.debug( - "creating time series {} message: {}".format(ts_path, status.message) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.createTimeseries(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.createTimeseries(request) + ) + except TTransport.TException as e1: + logger.exception("creating time series fails because: ", e1) + raise e1 + else: + raise e def create_aligned_time_series( self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst @@ -272,14 +319,21 @@ class Session(object): encoding_lst, compressor_lst, ) - status = self.__client.createAlignedTimeseries(request) - logger.debug( - "creating aligned time series of device {} message: {}".format( - measurements_lst, status.message + try: + return Session.verify_success( + self.__client.createAlignedTimeseries(request) ) - ) - - return Session.verify_success(status) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.createAlignedTimeseries(request) + ) + except TTransport.TException as e1: + logger.exception("creating time series fails because: ", e1) + raise e1 + raise e def create_multi_time_series( self, @@ -318,28 +372,39 @@ class Session(object): attributes_lst, alias_lst, ) - status = self.__client.createMultiTimeseries(request) - logger.debug( - "creating multiple time series {} message: {}".format( - ts_path_lst, status.message - ) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.createMultiTimeseries(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.createMultiTimeseries(request) + ) + except TTransport.TException as e1: + logger.exception("creating multi time series fails because: ", e1) + raise e1 + else: + raise e def delete_time_series(self, paths_list): """ delete multiple time series, including data and schema :param paths_list: List of time series path, which should be complete (starts from root) """ - status = self.__client.deleteTimeseries(self.__session_id, paths_list) - logger.debug( - "deleting multiple time series {} message: {}".format( - paths_list, status.message + try: + return Session.verify_success( + self.__client.deleteTimeseries(self.__session_id, paths_list) ) - ) - - return Session.verify_success(status) + except TTransport.TException: + if self.reconnect(): + try: + return Session.verify_success( + self.__client.deleteTimeseries(self.__session_id, paths_list) + ) + except TTransport.TException as e1: + logger.exception("deleting time series fails because: ", e1) + raise e1 def check_time_series_exists(self, path): """ @@ -358,14 +423,21 @@ class Session(object): :param paths_list: time series list that the data in. :param end_time: data with time stamp less than or equal to time will be deleted. """ - request = TSDeleteDataReq(self.__session_id, paths_list, -9223372036854775808, end_time) + request = TSDeleteDataReq( + self.__session_id, paths_list, -9223372036854775808, end_time + ) try: - status = self.__client.deleteData(request) - logger.debug( - "delete data from {}, message: {}".format(paths_list, status.message) - ) + return Session.verify_success(self.__client.deleteData(request)) except TTransport.TException as e: - logger.exception("data deletion fails because: ", e) + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.deleteData(request)) + except TTransport.TException as e1: + logger.exception("data deletion fails because: ", e1) + raise e1 + else: + raise e def delete_data_in_range(self, paths_list, start_time, end_time): """ @@ -376,12 +448,17 @@ class Session(object): """ request = TSDeleteDataReq(self.__session_id, paths_list, start_time, end_time) try: - status = self.__client.deleteData(request) - logger.debug( - "delete data from {}, message: {}".format(paths_list, status.message) - ) + return Session.verify_success(self.__client.deleteData(request)) except TTransport.TException as e: - logger.exception("data deletion fails because: ", e) + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.deleteData(request)) + except TTransport.TException as e1: + logger.exception("data deletion fails because: ", e1) + raise e1 + else: + raise e def insert_str_record(self, device_id, timestamp, measurements, string_values): """special case for inserting one row of String (TEXT) value""" @@ -389,18 +466,23 @@ class Session(object): string_values = [string_values] if type(measurements) == str: measurements = [measurements] - data_types = [TSDataType.TEXT.value for _ in string_values] request = self.gen_insert_str_record_req( - device_id, timestamp, measurements, data_types, string_values + device_id, timestamp, measurements, string_values ) - status = self.__client.insertStringRecord(request) - logger.debug( - "insert one record to device {} message: {}".format( - device_id, status.message - ) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.insertStringRecord(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.insertStringRecord(request) + ) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_aligned_str_record( self, device_id, timestamp, measurements, string_values @@ -410,18 +492,23 @@ class Session(object): string_values = [string_values] if type(measurements) == str: measurements = [measurements] - data_types = [TSDataType.TEXT.value for _ in string_values] request = self.gen_insert_str_record_req( - device_id, timestamp, measurements, data_types, string_values, True - ) - status = self.__client.insertStringRecord(request) - logger.debug( - "insert one record to device {} message: {}".format( - device_id, status.message - ) + device_id, timestamp, measurements, string_values, True ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.insertStringRecord(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.insertStringRecord(request) + ) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_record(self, device_id, timestamp, measurements, data_types, values): """ @@ -439,14 +526,18 @@ class Session(object): request = self.gen_insert_record_req( device_id, timestamp, measurements, data_types, values ) - status = self.__client.insertRecord(request) - logger.debug( - "insert one record to device {} message: {}".format( - device_id, status.message - ) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.insertRecord(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertRecord(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_records( self, device_ids, times, measurements_lst, types_lst, values_lst @@ -467,14 +558,18 @@ class Session(object): request = self.gen_insert_records_req( device_ids, times, measurements_lst, type_values_lst, values_lst ) - status = self.__client.insertRecords(request) - logger.debug( - "insert multiple records to devices {} message: {}".format( - device_ids, status.message - ) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.insertRecords(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertRecords(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_aligned_record( self, device_id, timestamp, measurements, data_types, values @@ -494,14 +589,18 @@ class Session(object): request = self.gen_insert_record_req( device_id, timestamp, measurements, data_types, values, True ) - status = self.__client.insertRecord(request) - logger.debug( - "insert one record to device {} message: {}".format( - device_id, status.message - ) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.insertRecord(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertRecord(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_aligned_records( self, device_ids, times, measurements_lst, types_lst, values_lst @@ -522,14 +621,18 @@ class Session(object): request = self.gen_insert_records_req( device_ids, times, measurements_lst, type_values_lst, values_lst, True ) - status = self.__client.insertRecords(request) - logger.debug( - "insert multiple records to devices {} message: {}".format( - device_ids, status.message - ) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.insertRecords(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertRecords(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def test_insert_record( self, device_id, timestamp, measurements, data_types, values @@ -547,14 +650,19 @@ class Session(object): request = self.gen_insert_record_req( device_id, timestamp, measurements, data_types, values ) - status = self.__client.testInsertRecord(request) - logger.debug( - "testing! insert one record to device {} message: {}".format( - device_id, status.message - ) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.testInsertRecord(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + return Session.verify_success( + self.__client.testInsertRecord(request) + ) + except TTransport.TException as e1: + logger.exception("test insert fails because: ", e1) + raise e1 + else: + raise e def test_insert_records( self, device_ids, times, measurements_lst, types_lst, values_lst @@ -575,12 +683,19 @@ class Session(object): request = self.gen_insert_records_req( device_ids, times, measurements_lst, type_values_lst, values_lst ) - status = self.__client.testInsertRecords(request) - logger.debug( - "testing! insert multiple records, message: {}".format(status.message) - ) - - return Session.verify_success(status) + try: + return Session.verify_success(self.__client.testInsertRecords(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + return Session.verify_success( + self.__client.testInsertRecords(request) + ) + except TTransport.TException as e1: + logger.exception("test insert fails because: ", e1) + raise e1 + else: + raise e def gen_insert_record_req( self, device_id, timestamp, measurements, data_types, values, is_aligned=False @@ -600,11 +715,11 @@ class Session(object): ) def gen_insert_str_record_req( - self, device_id, timestamp, measurements, data_types, values, is_aligned=False + self, device_id, timestamp, measurements, values, is_aligned=False ): - if (len(values) != len(data_types)) or (len(values) != len(measurements)): + if len(values) != len(measurements): raise RuntimeError( - "length of data types does not equal to length of values!" + "length of measurements does not equal to length of values!" ) return TSInsertStringRecordReq( self.__session_id, device_id, measurements, values, timestamp, is_aligned @@ -661,24 +776,38 @@ class Session(object): The tablet itself is sorted (see docs of Tablet.py) :param tablet: a tablet specified above """ - status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet)) - logger.debug( - "insert one tablet to device {} message: {}".format( - tablet.get_device_id(), status.message - ) - ) - - return Session.verify_success(status) + request = self.gen_insert_tablet_req(tablet) + try: + return Session.verify_success(self.__client.insertTablet(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertTablet(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_tablets(self, tablet_lst): """ insert multiple tablets, tablets are independent to each other :param tablet_lst: List of tablets """ - status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst)) - logger.debug("insert multiple tablets, message: {}".format(status.message)) - - return Session.verify_success(status) + request = self.gen_insert_tablets_req(tablet_lst) + try: + return Session.verify_success(self.__client.insertTablets(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertTablets(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_aligned_tablet(self, tablet): """ @@ -692,26 +821,38 @@ class Session(object): The tablet itself is sorted (see docs of Tablet.py) :param tablet: a tablet specified above """ - status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet, True)) - logger.debug( - "insert one tablet to device {} message: {}".format( - tablet.get_device_id(), status.message - ) - ) - - return Session.verify_success(status) + request = self.gen_insert_tablet_req(tablet, True) + try: + return Session.verify_success(self.__client.insertTablet(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertTablet(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_aligned_tablets(self, tablet_lst): """ insert multiple aligned tablets, tablets are independent to each other :param tablet_lst: List of tablets """ - status = self.__client.insertTablets( - self.gen_insert_tablets_req(tablet_lst, True) - ) - logger.debug("insert multiple tablets, message: {}".format(status.message)) - - return Session.verify_success(status) + request = self.gen_insert_tablets_req(tablet_lst, True) + try: + return Session.verify_success(self.__client.insertTablets(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success(self.__client.insertTablets(request)) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_records_of_one_device( self, device_id, times_list, measurements_list, types_list, values_list @@ -764,12 +905,22 @@ class Session(object): request = self.gen_insert_records_of_one_device_request( device_id, times_list, measurements_list, values_list, types_list ) - - # send request - status = self.__client.insertRecordsOfOneDevice(request) - logger.debug("insert records of one device, message: {}".format(status.message)) - - return Session.verify_success(status) + try: + return Session.verify_success( + self.__client.insertRecordsOfOneDevice(request) + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.insertRecordsOfOneDevice(request) + ) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_aligned_records_of_one_device( self, device_id, times_list, measurements_list, types_list, values_list @@ -823,10 +974,22 @@ class Session(object): ) # send request - status = self.__client.insertRecordsOfOneDevice(request) - logger.debug("insert records of one device, message: {}".format(status.message)) - - return Session.verify_success(status) + try: + return Session.verify_success( + self.__client.insertRecordsOfOneDevice(request) + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.insertRecordsOfOneDevice(request) + ) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def gen_insert_records_of_one_device_request( self, @@ -864,14 +1027,21 @@ class Session(object): should be used to test other time cost in client :param tablet: a tablet of data """ - status = self.__client.testInsertTablet(self.gen_insert_tablet_req(tablet)) - logger.debug( - "testing! insert one tablet to device {} message: {}".format( - tablet.get_device_id(), status.message - ) - ) - - return Session.verify_success(status) + try: + request = self.gen_insert_tablet_req(tablet) + return Session.verify_success(self.__client.testInsertTablet(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.testInsertTablet(request) + ) + except TTransport.TException as e1: + logger.exception("test insert fails because: ", e1) + raise e1 + else: + raise e def test_insert_tablets(self, tablet_list): """ @@ -879,14 +1049,20 @@ class Session(object): should be used to test other time cost in client :param tablet_list: List of tablets """ - status = self.__client.testInsertTablets( - self.gen_insert_tablets_req(tablet_list) - ) - logger.debug( - "testing! insert multiple tablets, message: {}".format(status.message) - ) - - return Session.verify_success(status) + try: + request = self.gen_insert_tablets_req(tablet_list) + return Session.verify_success(self.__client.testInsertTablets(request)) + except TTransport.TException as e: + if self.reconnect(): + try: + return Session.verify_success( + self.__client.testInsertTablets(request) + ) + except TTransport.TException as e1: + logger.exception("test insert fails because: ", e1) + raise e1 + else: + raise e def gen_insert_tablet_req(self, tablet, is_aligned=False): data_type_values = [data_type.value for data_type in tablet.get_data_types()] @@ -938,19 +1114,43 @@ class Session(object): request = TSExecuteStatementReq( self.__session_id, sql, self.__statement_id, self.__fetch_size, timeout ) - resp = self.__client.executeQueryStatement(request) - return SessionDataSet( - sql, - resp.columns, - resp.dataTypeList, - resp.columnNameIndexMap, - resp.queryId, - self.__client, - self.__statement_id, - self.__session_id, - resp.queryDataSet, - resp.ignoreTimeStamp, - ) + try: + resp = self.__client.executeQueryStatement(request) + return SessionDataSet( + sql, + resp.columns, + resp.dataTypeList, + resp.columnNameIndexMap, + resp.queryId, + self.__client, + self.__statement_id, + self.__session_id, + resp.queryDataSet, + resp.ignoreTimeStamp, + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + request.statementId = self.__statement_id + resp = self.__client.executeQueryStatement(request) + return SessionDataSet( + sql, + resp.columns, + resp.dataTypeList, + resp.columnNameIndexMap, + resp.queryId, + self.__client, + self.__statement_id, + self.__session_id, + resp.queryDataSet, + resp.ignoreTimeStamp, + ) + except TTransport.TException as e1: + logger.exception("execution of query statement fails because: ", e1) + raise e1 + else: + raise e def execute_non_query_statement(self, sql): """ @@ -961,12 +1161,22 @@ class Session(object): try: resp = self.__client.executeUpdateStatement(request) status = resp.status - logger.debug( - "execute non-query statement {} message: {}".format(sql, status.message) - ) return Session.verify_success(status) except TTransport.TException as e: - raise RuntimeError("execution of non-query statement fails because: ", e) + if self.reconnect(): + try: + request.sessionId = self.__session_id + request.statementId = self.__statement_id + resp = self.__client.executeUpdateStatement(request) + status = resp.status + return Session.verify_success(status) + except TTransport.TException as e1: + logger.exception( + "execution of non-query statement fails because: ", e1 + ) + raise e1 + else: + raise e @staticmethod def value_to_bytes(data_types, values): @@ -1050,13 +1260,32 @@ class Session(object): verify success of operation :param status: execution result status """ - if status.code == Session.SUCCESS_STATUS or status.code == Session.REDIRECTION_RECOMMEND: + if status.code == Session.MULTIPLE_ERROR: + Session.verify_success_by_list(status.subStatus) + return 0 + if ( + status.code == Session.SUCCESS_STATUS + or status.code == Session.REDIRECTION_RECOMMEND + ): return 0 logger.error("error status is %s", status) - raise RuntimeError( - "execution of statement fails because: " + status.message - ) + raise RuntimeError(status.code + ": " + status.message) + + @staticmethod + def verify_success_by_list(status_list): + """ + verify success of operation + :param status_list: execution result status + """ + message = str(Session.MULTIPLE_ERROR) + ": " + for status in status_list: + if ( + status.code != Session.SUCCESS_STATUS + and status.code != Session.REDIRECTION_RECOMMEND + ): + message += status.message + "; " + raise RuntimeError(message) def execute_raw_data_query( self, paths: list, start_time: int, end_time: int @@ -1077,19 +1306,43 @@ class Session(object): statementId=self.__statement_id, enableRedirectQuery=False, ) - resp = self.__client.executeRawDataQuery(request) - return SessionDataSet( - "", - resp.columns, - resp.dataTypeList, - resp.columnNameIndexMap, - resp.queryId, - self.__client, - self.__statement_id, - self.__session_id, - resp.queryDataSet, - resp.ignoreTimeStamp, - ) + try: + resp = self.__client.executeRawDataQuery(request) + return SessionDataSet( + "", + resp.columns, + resp.dataTypeList, + resp.columnNameIndexMap, + resp.queryId, + self.__client, + self.__statement_id, + self.__session_id, + resp.queryDataSet, + resp.ignoreTimeStamp, + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + request.statementId = self.__statement_id + resp = self.__client.executeRawDataQuery(request) + return SessionDataSet( + "", + resp.columns, + resp.dataTypeList, + resp.columnNameIndexMap, + resp.queryId, + self.__client, + self.__statement_id, + self.__session_id, + resp.queryDataSet, + resp.ignoreTimeStamp, + ) + except TTransport.TException as e1: + logger.exception("execution of query statement fails because: ", e1) + raise e1 + else: + raise e def execute_last_data_query(self, paths: list, last_time: int) -> SessionDataSet: """ @@ -1106,20 +1359,43 @@ class Session(object): self.__statement_id, enableRedirectQuery=False, ) - - resp = self.__client.executeLastDataQuery(request) - return SessionDataSet( - "", - resp.columns, - resp.dataTypeList, - resp.columnNameIndexMap, - resp.queryId, - self.__client, - self.__statement_id, - self.__session_id, - resp.queryDataSet, - resp.ignoreTimeStamp, - ) + try: + resp = self.__client.executeLastDataQuery(request) + return SessionDataSet( + "", + resp.columns, + resp.dataTypeList, + resp.columnNameIndexMap, + resp.queryId, + self.__client, + self.__statement_id, + self.__session_id, + resp.queryDataSet, + resp.ignoreTimeStamp, + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + request.statementId = self.__statement_id + resp = self.__client.executeLastDataQuery(request) + return SessionDataSet( + "", + resp.columns, + resp.dataTypeList, + resp.columnNameIndexMap, + resp.queryId, + self.__client, + self.__statement_id, + self.__session_id, + resp.queryDataSet, + resp.ignoreTimeStamp, + ) + except TTransport.TException as e1: + logger.exception("execution of query statement fails because: ", e1) + raise e1 + else: + raise e def insert_string_records_of_one_device( self, @@ -1146,12 +1422,22 @@ class Session(object): request = self.gen_insert_string_records_of_one_device_request( device_id, times, measurements_list, values_list, have_sorted, False ) - status = self.__client.insertStringRecordsOfOneDevice(request) - logger.debug( - "insert one device {} message: {}".format(device_id, status.message) - ) - - return Session.verify_success(status) + try: + return Session.verify_success( + self.__client.insertStringRecordsOfOneDevice(request) + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.insertStringRecordsOfOneDevice(request) + ) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e def insert_aligned_string_records_of_one_device( self, @@ -1168,12 +1454,50 @@ class Session(object): request = self.gen_insert_string_records_of_one_device_request( device_id, times, measurements_list, values, have_sorted, True ) - status = self.__client.insertStringRecordsOfOneDevice(request) - logger.debug( - "insert one device {} message: {}".format(device_id, status.message) - ) + try: + return Session.verify_success( + self.__client.insertStringRecordsOfOneDevice(request) + ) + except TTransport.TException as e: + if self.reconnect(): + try: + request.sessionId = self.__session_id + return Session.verify_success( + self.__client.insertStringRecordsOfOneDevice(request) + ) + except TTransport.TException as e1: + logger.exception("insert fails because: ", e1) + raise e1 + else: + raise e - return Session.verify_success(status) + def reconnect(self): + if self.__hosts is None: + return False + connected = False + for i in range(1, self.RETRY_NUM): + if self.__transport is not None: + self.__transport.close() + curr_host_index = random.randint(0, len(self.__hosts)) + try_host_num = 0 + for j in range(curr_host_index, len(self.__hosts)): + if try_host_num == len(self.__hosts): + break + self.__default_endpoint = TEndPoint( + self.__hosts[j], self.__ports[j] + ) + if j == len(self.__hosts) - 1: + j = -1 + try_host_num += 1 + try: + self.init(self.__default_endpoint) + connected = True + except TTransport.TException as e: + continue + break + if connected: + break + return connected def gen_insert_string_records_of_one_device_request( self, diff --git a/client-py/iotdb/utils/IoTDBRpcDataSet.py b/client-py/iotdb/utils/IoTDBRpcDataSet.py index e9dfa7bfe6..6d31629340 100644 --- a/client-py/iotdb/utils/IoTDBRpcDataSet.py +++ b/client-py/iotdb/utils/IoTDBRpcDataSet.py @@ -243,7 +243,9 @@ class IoTDBRpcDataSet(object): if len(data_array) < total_length: if data_type == TSDataType.INT32 or data_type == TSDataType.INT64: tmp_array = np.full(total_length, np.nan, np.float32) - elif data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE: + elif ( + data_type == TSDataType.FLOAT or data_type == TSDataType.DOUBLE + ): tmp_array = np.full(total_length, np.nan, data_array.dtype) elif data_type == TSDataType.BOOLEAN: tmp_array = np.full(total_length, np.nan, np.float32) @@ -252,7 +254,7 @@ class IoTDBRpcDataSet(object): bitmap_buffer = self.__query_data_set.bitmapList[location] bitmap_str = self._to_bitstring(bitmap_buffer) - bit_mask = (np.fromstring(bitmap_str, 'u1') - ord('0')).astype(bool) + bit_mask = (np.fromstring(bitmap_str, "u1") - ord("0")).astype(bool) if len(bit_mask) != total_length: bit_mask = bit_mask[:total_length] tmp_array[bit_mask] = data_array diff --git a/client-py/iotdb/utils/NumpyTablet.py b/client-py/iotdb/utils/NumpyTablet.py index 7315b872e5..dbb0d354db 100644 --- a/client-py/iotdb/utils/NumpyTablet.py +++ b/client-py/iotdb/utils/NumpyTablet.py @@ -22,7 +22,9 @@ from iotdb.utils.BitMap import BitMap class NumpyTablet(object): - def __init__(self, device_id, measurements, data_types, values, timestamps, bitmaps=None): + def __init__( + self, device_id, measurements, data_types, values, timestamps, bitmaps=None + ): """ creating a numpy tablet for insertion for example, considering device: root.sg1.d1 diff --git a/client-py/tests/tablet_performance_comparison.py b/client-py/tests/tablet_performance_comparison.py index ef5847140c..3626e818a8 100644 --- a/client-py/tests/tablet_performance_comparison.py +++ b/client-py/tests/tablet_performance_comparison.py @@ -75,9 +75,9 @@ def generate_csv_data( if _type == TSDataType.BOOLEAN: return [random.randint(0, 1) == 1 for _ in range(_row)] elif _type == TSDataType.INT32: - return [random.randint(-(2 ** 31), 2 ** 31) for _ in range(_row)] + return [random.randint(-(2**31), 2**31) for _ in range(_row)] elif _type == TSDataType.INT64: - return [random.randint(-(2 ** 63), 2 ** 63) for _ in range(_row)] + return [random.randint(-(2**63), 2**63) for _ in range(_row)] elif _type == TSDataType.FLOAT: return [1.5 for _ in range(_row)] elif _type == TSDataType.DOUBLE: diff --git a/client-py/tests/test_dataframe.py b/client-py/tests/test_dataframe.py index e7492a3b14..801b7c1bcd 100644 --- a/client-py/tests/test_dataframe.py +++ b/client-py/tests/test_dataframe.py @@ -53,7 +53,9 @@ def test_non_time_query(): session.insert_str_record("root.device0", 123, "pressure", "15.0") # Read - session_data_set = session.execute_query_statement("SHOW TIMESERIES root.device0.*") + session_data_set = session.execute_query_statement( + "SHOW TIMESERIES root.device0.*" + ) df = session_data_set.todf() session.close() @@ -68,7 +70,7 @@ def test_non_time_query(): "Tags", "Attributes", "Deadband", - "DeadbandParameters" + "DeadbandParameters", ] assert_array_equal( df.values, diff --git a/client-py/tests/test_delete_data.py b/client-py/tests/test_delete_data.py index 031e8ef6e6..8f44574726 100644 --- a/client-py/tests/test_delete_data.py +++ b/client-py/tests/test_delete_data.py @@ -102,7 +102,9 @@ def test_delete_date(): print_message("insert aligned record of one device failed") # execute delete data - session.delete_data(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1) + session.delete_data( + ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1 + ) # execute raw data query sql statement session_data_set = session.execute_raw_data_query( @@ -127,7 +129,9 @@ def test_delete_date(): assert actual_count == expect_count # execute delete data - session.delete_data_in_range(["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3) + session.delete_data_in_range( + ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 2, 3 + ) # execute raw data query sql statement session_data_set = session.execute_raw_data_query( diff --git a/client-py/tests/test_numpy_tablet.py b/client-py/tests/test_numpy_tablet.py index ddcb17b070..217df22ea2 100644 --- a/client-py/tests/test_numpy_tablet.py +++ b/client-py/tests/test_numpy_tablet.py @@ -100,7 +100,12 @@ def test_numpy_tablet_with_none_serialization(): np_bitmaps_[4].mark(3) np_bitmaps_[5].mark(3) np_tablet_ = NumpyTablet( - "root.sg_test_01.d_01", measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_ + "root.sg_test_01.d_01", + measurements_, + data_types_, + np_values_, + np_timestamps_, + np_bitmaps_, ) assert tablet_.get_binary_timestamps() == np_tablet_.get_binary_timestamps() assert tablet_.get_binary_values() == np_tablet_.get_binary_values() diff --git a/client-py/tests/test_session.py b/client-py/tests/test_session.py index 6fbddb9820..8a466b8717 100644 --- a/client-py/tests/test_session.py +++ b/client-py/tests/test_session.py @@ -311,7 +311,12 @@ def test_session(): np_bitmaps_[4].mark(3) np_bitmaps_[5].mark(3) np_tablet_ = NumpyTablet( - "root.sg_test_01.d_01", measurements_, data_types_, np_values_, np_timestamps_, np_bitmaps_ + "root.sg_test_01.d_01", + measurements_, + data_types_, + np_values_, + np_timestamps_, + np_bitmaps_, ) if session.insert_tablet(np_tablet_) < 0: test_fail() diff --git a/client-py/tests/test_template.py b/client-py/tests/test_template.py index e91a8a0990..93cfca40cf 100644 --- a/client-py/tests/test_template.py +++ b/client-py/tests/test_template.py @@ -92,12 +92,18 @@ def test_set_template(): session.execute_non_query_statement("CREATE DATABASE root.python") session.set_schema_template(template_name, "root.python.GPS") - session.execute_non_query_statement("create timeseries of schema template on root.python.GPS") + session.execute_non_query_statement( + "create timeseries of schema template on root.python.GPS" + ) assert session.show_paths_template_set_on(template_name) == ["root.python.GPS"] - assert session.show_paths_template_using_on(template_name) == ["root.python.GPS"] + assert session.show_paths_template_using_on(template_name) == [ + "root.python.GPS" + ] - session.execute_non_query_statement("delete timeseries of schema template from root.python.GPS") + session.execute_non_query_statement( + "delete timeseries of schema template from root.python.GPS" + ) session.unset_schema_template(template_name, "root.python.GPS") session.drop_schema_template(template_name)
