This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch IOTDB-1801 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3a4ebbe375f1eda3472c50a37ac2f41eedf10b5c Author: HTHou <[email protected]> AuthorDate: Mon Jan 10 14:20:22 2022 +0800 [IOTDB-1801] Python APIs for aligned timeseries --- client-py/SessionAlignedTimeseriesExample.py | 197 +++++++++++++++++++ client-py/SessionAlignedTimeseriesTest.py | 280 +++++++++++++++++++++++++++ client-py/iotdb/Session.py | 209 +++++++++++++++++++- 3 files changed, 676 insertions(+), 10 deletions(-) diff --git a/client-py/SessionAlignedTimeseriesExample.py b/client-py/SessionAlignedTimeseriesExample.py new file mode 100644 index 0000000..a54b169 --- /dev/null +++ b/client-py/SessionAlignedTimeseriesExample.py @@ -0,0 +1,197 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Uncomment the following line to use apache-iotdb module installed by pip3 + +from iotdb.Session import Session +from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor +from iotdb.utils.Tablet import Tablet + +# creating session connection. +ip = "127.0.0.1" +port_ = "6667" +username_ = "root" +password_ = "root" +session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") +session.open(False) + +# set and delete storage groups +session.set_storage_group("root.sg_test_01") +session.set_storage_group("root.sg_test_02") +session.set_storage_group("root.sg_test_03") +session.set_storage_group("root.sg_test_04") +session.delete_storage_group("root.sg_test_02") +session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) + +# setting aligned time series. +measurements_lst_ = [ + "s_01", + "s_02", + "s_03", +] +data_type_lst_ = [ + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, +] +encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] +compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] +session.create_aligned_time_series( + "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_ +) + +# setting more aligned time series once. +measurements_lst_ = [ + "s_04", + "s_05", + "s_06", + "s_07", + "s_08", + "s_09", +] +data_type_lst_ = [ + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, +] +encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] +compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] +session.create_aligned_time_series( + "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_ +) + +# delete time series +session.delete_time_series( + [ + "root.sg_test_01.d_02.s_07", + "root.sg_test_01.d_02.s_08", + "root.sg_test_01.d_02.s_09", + ] +) + +# checking time series +print( + "s_07 expecting False, checking result: ", + session.check_time_series_exists("root.sg_test_01.d_02.s_07"), +) +print( + "s_03 expecting True, checking result: ", + session.check_time_series_exists("root.sg_test_01.d_02.s_03"), +) + +# insert one aligned record into the database. +measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] +values_ = [False, 10, 11, 1.1, 10011.1, "test_record"] +data_types_ = [ + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, +] +session.insert_aligned_record("root.sg_test_01.d_02", 1, measurements_, data_types_, values_) + +# insert multiple aligned records into database +measurements_list_ = [ + ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], + ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], +] +values_list_ = [ + [False, 22, 33, 4.4, 55.1, "test_records01"], + [True, 77, 88, 1.25, 8.125, "test_records02"], +] +data_type_list_ = [data_types_, data_types_] +device_ids_ = ["root.sg_test_01.d_02", "root.sg_test_01.d_02"] +session.insert_aligned_records( + device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_ +) + +# insert one aligned tablet into the database. +values_ = [ + [False, 10, 11, 1.1, 10011.1, "test01"], + [True, 100, 11111, 1.25, 101.0, "test02"], + [False, 100, 1, 188.1, 688.25, "test03"], + [True, 0, 0, 0, 6.25, "test04"], +] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. +timestamps_ = [4, 5, 6, 7] +tablet_ = Tablet( + "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_ +) +session.insert_aligned_tablet(tablet_) + +# insert multiple aligned tablets into database +tablet_01 = Tablet( + "root.sg_test_01.d_02", measurements_, data_types_, values_, [8, 9, 10, 11] +) +tablet_02 = Tablet( + "root.sg_test_01.d_02", measurements_, data_types_, values_, [12, 13, 14, 15] +) +session.insert_aligned_tablets([tablet_01, tablet_02]) + +# insert one aligned tablet with empty cells into the database. +values_ = [ + [None, 10, 11, 1.1, 10011.1, "test01"], + [True, None, 11111, 1.25, 101.0, "test02"], + [False, 100, 1, None, 688.25, "test03"], + [True, 0, 0, 0, 6.25, None], +] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. +timestamps_ = [16, 17, 18, 19] +tablet_ = Tablet( + "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_ +) +session.insert_aligned_tablet(tablet_) + +# insert aligned records of one device +time_list = [1, 2, 3] +measurements_list = [ + ["s_01", "s_02", "s_03"], + ["s_01", "s_02", "s_03"], + ["s_01", "s_02", "s_03"], +] +data_types_list = [ + [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], + [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], + [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], +] +values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]] + +session.insert_aligned_records_of_one_device( + "root.sg_test_01.d_02", time_list, measurements_list, data_types_list, values_list +) + +# execute non-query sql statement +session.execute_non_query_statement( + "insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)" +) + +# execute sql query statement +with session.execute_query_statement( + "select * from root.sg_test_01.d_02" +) as session_data_set: + session_data_set.set_fetch_size(1024) + while session_data_set.has_next(): + print(session_data_set.next()) + +# close session connection. +session.close() + +print("All executions done!!") diff --git a/client-py/SessionAlignedTimeseriesTest.py b/client-py/SessionAlignedTimeseriesTest.py new file mode 100644 index 0000000..fbd36df --- /dev/null +++ b/client-py/SessionAlignedTimeseriesTest.py @@ -0,0 +1,280 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Uncomment the following line to use apache-iotdb module installed by pip3 +from iotdb.Session import Session +from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor +from iotdb.utils.Tablet import Tablet + +# whether the test has passed +final_flag = True +failed_count = 0 + + +def test_fail(): + global failed_count + global final_flag + final_flag = False + failed_count += 1 + + +def print_message(message): + print("*********") + print(message) + print("*********") + + +# creating session connection. +ip = "127.0.0.1" +port_ = "6667" +username_ = "root" +password_ = "root" +session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") +session.open(False) + +if not session.is_open(): + print("can't open session") + exit(1) + +# set and delete storage groups +session.set_storage_group("root.sg_test_01") +session.set_storage_group("root.sg_test_02") +session.set_storage_group("root.sg_test_03") +session.set_storage_group("root.sg_test_04") + +if session.delete_storage_group("root.sg_test_02") < 0: + test_fail() + print_message("delete storage group failed") + +if session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) < 0: + test_fail() + print_message("delete storage groups failed") + +# setting aligned time series. +measurements_lst_ = [ + "s_01", + "s_02", + "s_03", +] +data_type_lst_ = [ + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, +] +encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] +compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] +session.create_aligned_time_series( + "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_ +) + +# setting more aligned time series once. +measurements_lst_ = [ + "s_04", + "s_05", + "s_06", + "s_07", + "s_08", + "s_09", +] +data_type_lst_ = [ + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, +] +encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] +compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] +session.create_aligned_time_series( + "root.sg_test_01.d_02", measurements_lst_, data_type_lst_, encoding_lst_, compressor_lst_ +) + +# delete time series +if ( + session.delete_time_series( + [ + "root.sg_test_01.d_02.s_07", + "root.sg_test_01.d_02.s_08", + "root.sg_test_01.d_02.s_09", + ] + ) + < 0 +): + test_fail() + print_message("delete time series failed") + +# checking time series +# s_07 expecting False +if session.check_time_series_exists("root.sg_test_01.d_02.s_07"): + test_fail() + print_message("root.sg_test_01.d_02.s_07 shouldn't exist") + +# s_03 expecting True +if not session.check_time_series_exists("root.sg_test_01.d_02.s_03"): + test_fail() + print_message("root.sg_test_01.d_02.s_03 should exist") + +# insert one record into the database. +measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] +values_ = [False, 10, 11, 1.1, 10011.1, "test_record"] +data_types_ = [ + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TEXT, +] +if ( + session.insert_aligned_record( + "root.sg_test_01.d_02", 1, measurements_, data_types_, values_ + ) + < 0 +): + test_fail() + print_message("insert record failed") + +# insert multiple records into database +measurements_list_ = [ + ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], + ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], +] +values_list_ = [ + [False, 22, 33, 4.4, 55.1, "test_records01"], + [True, 77, 88, 1.25, 8.125, "test_records02"], +] +data_type_list_ = [data_types_, data_types_] +device_ids_ = ["root.sg_test_01.d_02", "root.sg_test_01.d_02"] +if ( + session.insert_aligned_records( + device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_ + ) + < 0 +): + test_fail() + print_message("insert records failed") + +# insert one tablet into the database. +values_ = [ + [False, 10, 11, 1.1, 10011.1, "test01"], + [True, 100, 11111, 1.25, 101.0, "test02"], + [False, 100, 1, 188.1, 688.25, "test03"], + [True, 0, 0, 0, 6.25, "test04"], +] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. +timestamps_ = [4, 5, 6, 7] +tablet_ = Tablet( + "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_ +) +if session.insert_aligned_tablet(tablet_) < 0: + test_fail() + print_message("insert tablet failed") + +# insert multiple tablets into database +tablet_01 = Tablet( + "root.sg_test_01.d_02", measurements_, data_types_, values_, [8, 9, 10, 11] +) +tablet_02 = Tablet( + "root.sg_test_01.d_02", measurements_, data_types_, values_, [12, 13, 14, 15] +) +if session.insert_aligned_tablets([tablet_01, tablet_02]) < 0: + test_fail() + print_message("insert tablets failed") + +# insert one tablet with empty cells into the database. +values_ = [ + [None, 10, 11, 1.1, 10011.1, "test01"], + [True, None, 11111, 1.25, 101.0, "test02"], + [False, 100, 1, None, 688.25, "test03"], + [True, 0, 0, 0, None, None], +] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. +timestamps_ = [20, 21, 22, 23] +tablet_ = Tablet( + "root.sg_test_01.d_02", measurements_, data_types_, values_, timestamps_ +) +if session.insert_aligned_tablet(tablet_) < 0: + test_fail() + print_message("insert tablet with empty cells failed") + +# insert records of one device +time_list = [1, 2, 3] +measurements_list = [ + ["s_01", "s_02", "s_03"], + ["s_01", "s_02", "s_03"], + ["s_01", "s_02", "s_03"], +] +data_types_list = [ + [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], + [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], + [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], +] +values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]] + +if ( + session.insert_aligned_records_of_one_device( + "root.sg_test_01.d_02", + time_list, + measurements_list, + data_types_list, + values_list, + ) + < 0 +): + test_fail() + print_message("insert records of one device failed") + +# execute non-query sql statement +if ( + session.execute_non_query_statement( + "insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)" + ) + < 0 +): + test_fail() + print_message( + "execute 'insert into root.sg_test_01.d_02(timestamp, s_02) aligned values(16, 188)' failed" + ) + +# execute sql query statement +session_data_set = session.execute_query_statement("select * from root.sg_test_01.d_02") +session_data_set.set_fetch_size(1024) +expect_count = 20 +actual_count = 0 +while session_data_set.has_next(): + print(session_data_set.next()) + actual_count += 1 +session_data_set.close_operation_handle() + +if actual_count != expect_count: + test_fail() + print_message( + "query count mismatch: expect count: " + + str(expect_count) + + " actual count: " + + str(actual_count) + ) + +# close session connection. +session.close() + +if final_flag: + print("All executions done!!") +else: + print("Some test failed, please have a check") + print("failed count: ", failed_count) + exit(1) diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py index ccd0fdd..4a825a3 100644 --- a/client-py/iotdb/Session.py +++ b/client-py/iotdb/Session.py @@ -38,7 +38,7 @@ from .thrift.rpc.TSIService import ( TSInsertRecordsReq, TSInsertRecordsOfOneDeviceReq, ) -from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq +from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, TSSetTimeZoneReq, TSCreateAlignedTimeseriesReq # for debug # from IoTDBConstants import * @@ -211,6 +211,33 @@ class Session(object): return Session.verify_success(status) + def create_aligned_time_series( + self, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst + ): + """ + create aligned time series + :param device_id: String, device id for timeseries (starts from root) + :param measurements_lst: List of String, measurement ids for time series + :param data_type_lst: List of TSDataType, data types for time series + :param encoding_lst: List of TSEncoding, encodings for time series + :param compressor_lst: List of Compressor, compressing types for time series + """ + data_type_lst = [data_type.value for data_type in data_type_lst] + encoding_lst = [encoding.value for encoding in encoding_lst] + compressor_lst = [compressor.value for compressor in compressor_lst] + + request = TSCreateAlignedTimeseriesReq( + self.__session_id, device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst + ) + status = self.__client.createAlignedTimeseries(request) + logger.debug( + "creating aligned time series of device {} message: {}".format( + measurements_lst, status.message + ) + ) + + return Session.verify_success(status) + def create_multi_time_series( self, ts_path_lst, data_type_lst, encoding_lst, compressor_lst ): @@ -296,6 +323,25 @@ class Session(object): return Session.verify_success(status) + def insert_aligned_str_record(self, device_id, timestamp, measurements, string_values): + """ special case for inserting one row of String (TEXT) value """ + if type(string_values) == str: + string_values = [string_values] + if type(measurements) == str: + measurements = [measurements] + data_types = [TSDataType.TEXT.value for _ in string_values] + request = self.gen_insert_str_record_req( + device_id, timestamp, measurements, data_types, string_values, True + ) + status = self.__client.insertStringRecord(request) + logger.debug( + "insert one record to device {} message: {}".format( + device_id, status.message + ) + ) + + return Session.verify_success(status) + def insert_record(self, device_id, timestamp, measurements, data_types, values): """ insert one row of record into database, if you want improve your performance, please use insertTablet method @@ -348,6 +394,58 @@ class Session(object): ) return Session.verify_success(status) + def insert_aligned_record(self, device_id, timestamp, measurements, data_types, values): + """ + insert one row of aligned record into database, if you want improve your performance, please use insertTablet method + for example a record at time=10086 with three measurements is: + timestamp, m1, m2, m3 + 10086, 125.3, True, text1 + :param device_id: String, time series path for device + :param timestamp: Integer, indicate the timestamp of the row of data + :param measurements: List of String, sensor names + :param data_types: List of TSDataType, indicate the data type for each sensor + :param values: List, values to be inserted, for each sensor + """ + data_types = [data_type.value for data_type in data_types] + request = self.gen_insert_record_req( + device_id, timestamp, measurements, data_types, values, True + ) + status = self.__client.insertRecord(request) + logger.debug( + "insert one record to device {} message: {}".format( + device_id, status.message + ) + ) + + return Session.verify_success(status) + + def insert_aligned_records( + self, device_ids, times, measurements_lst, types_lst, values_lst + ): + """ + insert multiple aligned rows of data, records are independent to each other, in other words, there's no relationship + between those records + :param device_ids: List of String, time series paths for device + :param times: List of Integer, timestamps for records + :param measurements_lst: 2-D List of String, each element of outer list indicates measurements of a device + :param types_lst: 2-D List of TSDataType, each element of outer list indicates sensor data types of a device + :param values_lst: 2-D List, values to be inserted, for each device + """ + type_values_lst = [] + for types in types_lst: + data_types = [data_type.value for data_type in types] + type_values_lst.append(data_types) + request = self.gen_insert_records_req( + device_ids, times, measurements_lst, type_values_lst, values_lst, True + ) + status = self.__client.insertRecords(request) + logger.debug( + "insert multiple records to devices {} message: {}".format( + device_ids, status.message + ) + ) + + return Session.verify_success(status) def test_insert_record( self, device_id, timestamp, measurements, data_types, values @@ -401,7 +499,7 @@ class Session(object): return Session.verify_success(status) def gen_insert_record_req( - self, device_id, timestamp, measurements, data_types, values + self, device_id, timestamp, measurements, data_types, values, is_aligned=False ): if (len(values) != len(data_types)) or (len(values) != len(measurements)): raise RuntimeError( @@ -409,22 +507,22 @@ class Session(object): ) values_in_bytes = Session.value_to_bytes(data_types, values) return TSInsertRecordReq( - self.__session_id, device_id, measurements, values_in_bytes, timestamp + self.__session_id, device_id, measurements, values_in_bytes, timestamp, is_aligned ) def gen_insert_str_record_req( - self, device_id, timestamp, measurements, data_types, values + self, device_id, timestamp, measurements, data_types, values, is_aligned=False ): if (len(values) != len(data_types)) or (len(values) != len(measurements)): raise RuntimeError( "length of data types does not equal to length of values!" ) return TSInsertStringRecordReq( - self.__session_id, device_id, measurements, values, timestamp + self.__session_id, device_id, measurements, values, timestamp, is_aligned ) def gen_insert_records_req( - self, device_ids, times, measurements_lst, types_lst, values_lst + self, device_ids, times, measurements_lst, types_lst, values_lst, is_aligned=False ): if ( (len(device_ids) != len(measurements_lst)) @@ -448,7 +546,7 @@ class Session(object): value_lst.append(values_in_bytes) return TSInsertRecordsReq( - self.__session_id, device_ids, measurements_lst, value_lst, times + self.__session_id, device_ids, measurements_lst, value_lst, times, is_aligned ) def insert_tablet(self, tablet): @@ -482,6 +580,37 @@ class Session(object): return Session.verify_success(status) + def insert_aligned_tablet(self, tablet): + """ + insert one aligned tablet, in a tablet, for each timestamp, the number of measurements is same + for example three records in the same device can form a tablet: + timestamps, m1, m2, m3 + 1, 125.3, True, text1 + 2, 111.6, False, text2 + 3, 688.6, True, text3 + Notice: From 0.13.0, the tablet can contain empty cell + The tablet itself is sorted (see docs of Tablet.py) + :param tablet: a tablet specified above + """ + status = self.__client.insertTablet(self.gen_insert_tablet_req(tablet, True)) + logger.debug( + "insert one tablet to device {} message: {}".format( + tablet.get_device_id(), status.message + ) + ) + + return Session.verify_success(status) + + def insert_aligned_tablets(self, tablet_lst): + """ + insert multiple aligned tablets, tablets are independent to each other + :param tablet_lst: List of tablets + """ + status = self.__client.insertTablets(self.gen_insert_tablets_req(tablet_lst, True)) + logger.debug("insert multiple tablets, message: {}".format(status.message)) + + return Session.verify_success(status) + def insert_records_of_one_device( self, device_id, times_list, measurements_list, types_list, values_list ): @@ -540,8 +669,65 @@ class Session(object): return Session.verify_success(status) + def insert_aligned_records_of_one_device( + self, device_id, times_list, measurements_list, types_list, values_list + ): + # sort by timestamp + sorted_zipped = sorted( + zip(times_list, measurements_list, types_list, values_list) + ) + result = zip(*sorted_zipped) + times_list, measurements_list, types_list, values_list = [ + list(x) for x in result + ] + + return self.insert_aligned_records_of_one_device_sorted( + device_id, times_list, measurements_list, types_list, values_list + ) + + def insert_aligned_records_of_one_device_sorted( + self, device_id, times_list, measurements_list, types_list, values_list + ): + """ + Insert multiple aligned rows, which can reduce the overhead of network. This method is just like jdbc + executeBatch, we pack some insert request in batch and send them to server. If you want to improve + your performance, please see insertTablet method + + :param device_id: device id + :param times_list: timestamps list + :param measurements_list: measurements list + :param types_list: types list + :param values_list: values list + """ + # check parameter + size = len(times_list) + if ( + size != len(measurements_list) + or size != len(types_list) + or size != len(values_list) + ): + raise RuntimeError( + "insert records of one device error: types, times, measurementsList and valuesList's size should be equal" + ) + + # check sorted + if not Session.check_sorted(times_list): + raise RuntimeError( + "insert records of one device error: timestamp not sorted" + ) + + request = self.gen_insert_records_of_one_device_request( + device_id, times_list, measurements_list, values_list, types_list, True + ) + + # send request + status = self.__client.insertRecordsOfOneDevice(request) + logger.debug("insert records of one device, message: {}".format(status.message)) + + return Session.verify_success(status) + def gen_insert_records_of_one_device_request( - self, device_id, times_list, measurements_list, values_list, types_list + self, device_id, times_list, measurements_list, values_list, types_list, is_aligned=False ): binary_value_list = [] for values, data_types, measurements in zip( @@ -561,6 +747,7 @@ class Session(object): measurements_list, binary_value_list, times_list, + is_aligned ) def test_insert_tablet(self, tablet): @@ -593,7 +780,7 @@ class Session(object): return Session.verify_success(status) - def gen_insert_tablet_req(self, tablet): + def gen_insert_tablet_req(self, tablet, is_aligned=False): data_type_values = [data_type.value for data_type in tablet.get_data_types()] return TSInsertTabletReq( self.__session_id, @@ -603,9 +790,10 @@ class Session(object): tablet.get_binary_timestamps(), data_type_values, tablet.get_row_number(), + is_aligned, ) - def gen_insert_tablets_req(self, tablet_lst): + def gen_insert_tablets_req(self, tablet_lst, is_aligned=False): device_id_lst = [] measurements_lst = [] values_lst = [] @@ -630,6 +818,7 @@ class Session(object): timestamps_lst, type_lst, size_lst, + is_aligned, ) def execute_query_statement(self, sql, timeout=0):
