This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch opt_py_dataset in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7459576a1ba41b7ec6c4ebb53fcba9c28dbd7dbc Author: HTHou <[email protected]> AuthorDate: Wed Sep 27 18:24:32 2023 +0800 optmize more --- iotdb-client/client-py/SessionExample.py | 2 +- iotdb-client/client-py/iotdb/utils/Field.py | 36 +++--- .../client-py/iotdb/utils/IoTDBRpcDataSet.py | 131 ++++++--------------- .../client-py/iotdb/utils/SessionDataSet.py | 53 +++++---- 4 files changed, 86 insertions(+), 136 deletions(-) diff --git a/iotdb-client/client-py/SessionExample.py b/iotdb-client/client-py/SessionExample.py index 79680acd68c..abb7db28143 100644 --- a/iotdb-client/client-py/SessionExample.py +++ b/iotdb-client/client-py/SessionExample.py @@ -26,7 +26,7 @@ session.open() # # print("todf cost: " + str(int(datetime.now().timestamp() * 1000) - startTime) + "ms") startTime = int(datetime.now().timestamp() * 1000) -with session.execute_query_statement("select ** from root") as data_set: +with session.execute_query_statement("select * from root.**") as data_set: data_set.get_column_names() data_set.get_column_types() while data_set.has_next(): diff --git a/iotdb-client/client-py/iotdb/utils/Field.py b/iotdb-client/client-py/iotdb/utils/Field.py index 281f005f554..9dd630df1c2 100644 --- a/iotdb-client/client-py/iotdb/utils/Field.py +++ b/iotdb-client/client-py/iotdb/utils/Field.py @@ -26,7 +26,7 @@ class Field(object): :param data_type: TSDataType """ self.__data_type = data_type - self.__value = value + self.value = value @staticmethod def copy(field): @@ -57,72 +57,72 @@ class Field(object): return self.__data_type is None def set_bool_value(self, value): - self.__value = value + self.value = value def get_bool_value(self): if self.__data_type is None: raise Exception("Null Field Exception!") if self.__data_type != TSDataType.BOOLEAN: return None - return self.__value + return self.value def set_int_value(self, value): - self.__value = value + self.value = value def get_int_value(self): if self.__data_type is None: raise Exception("Null Field Exception!") if self.__data_type != TSDataType.INT32: return None - return self.__value + return self.value def set_long_value(self, value): - self.__value = value + self.value = value def get_long_value(self): if self.__data_type is None: raise Exception("Null Field Exception!") if self.__data_type != TSDataType.INT64: return None - return self.__value + return self.value def set_float_value(self, value): - self.__value = value + self.value = value def get_float_value(self): if self.__data_type is None: raise Exception("Null Field Exception!") if self.__data_type != TSDataType.FLOAT: return None - return self.__value + return self.value def set_double_value(self, value): - self.__value = value + self.value = value def get_double_value(self): if self.__data_type is None: raise Exception("Null Field Exception!") if self.__data_type != TSDataType.DOUBLE: return None - return self.__value + return self.value def set_binary_value(self, value): - self.__value = value + self.value = value def get_binary_value(self): if self.__data_type is None: raise Exception("Null Field Exception!") if self.__data_type != TSDataType.TEXT: return None - return self.__value + return self.value def get_string_value(self): - if self.__data_type is None: + if self.__data_type is None or self.value is None: return "None" elif self.__data_type == TSDataType.TEXT: - return self.__value.decode("utf-8") + return self.value.decode("utf-8") else: - return str(self.__value) + return str(self.value) def __str__(self): return self.get_string_value() @@ -133,10 +133,10 @@ class Field(object): """ if self.__data_type is None: return None - return self.__value + return self.value def set_value(self, value): - self.__value = value + self.value = value @staticmethod def get_field(value, data_type): diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py index b060260c8f5..df349988547 100644 --- a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py @@ -55,65 +55,62 @@ class IoTDBRpcDataSet(object): ): self.__statement_id = statement_id self.__session_id = session_id - self.__ignore_timestamp = ignore_timestamp + self.ignore_timestamp = ignore_timestamp self.__sql = sql self.__query_id = query_id self.__client = client self.__fetch_size = fetch_size - self.__column_size = len(column_name_list) + self.column_size = len(column_name_list) self.__default_time_out = 1000 self.__column_name_list = [] self.__column_type_list = [] - self.__column_ordinal_dict = {} + self.column_ordinal_dict = {} if not ignore_timestamp: self.__column_name_list.append(IoTDBRpcDataSet.TIMESTAMP_STR) - # TSDataType.INT64 for time - self.__column_type_list.append(2) - self.__column_ordinal_dict[IoTDBRpcDataSet.TIMESTAMP_STR] = 1 + self.__column_type_list.append(TSDataType.INT64) + self.column_ordinal_dict[IoTDBRpcDataSet.TIMESTAMP_STR] = 1 if column_name_index is not None: - self.__column_type_deduplicated_list = [ + self.column_type_deduplicated_list = [ None for _ in range(len(column_name_index)) ] for i in range(len(column_name_list)): name = column_name_list[i] self.__column_name_list.append(name) self.__column_type_list.append(TSDataType[column_type_list[i]]) - if name not in self.__column_ordinal_dict: + if name not in self.column_ordinal_dict: index = column_name_index[name] - self.__column_ordinal_dict[name] = ( - index + IoTDBRpcDataSet.START_INDEX - ) - self.__column_type_deduplicated_list[index] = TSDataType[ + self.column_ordinal_dict[name] = index + IoTDBRpcDataSet.START_INDEX + self.column_type_deduplicated_list[index] = TSDataType[ column_type_list[i] ] else: index = IoTDBRpcDataSet.START_INDEX - self.__column_type_deduplicated_list = [] + self.column_type_deduplicated_list = [] for i in range(len(column_name_list)): name = column_name_list[i] self.__column_name_list.append(name) self.__column_type_list.append(TSDataType[column_type_list[i]]) - if name not in self.__column_ordinal_dict: - self.__column_ordinal_dict[name] = index + if name not in self.column_ordinal_dict: + self.column_ordinal_dict[name] = index index += 1 - self.__column_type_deduplicated_list.append( + self.column_type_deduplicated_list.append( TSDataType[column_type_list[i]] ) - self.__time_bytes = bytes(0) + self.time_bytes = bytes(0) self.__current_bitmap = [ - bytes(0) for _ in range(len(self.__column_type_deduplicated_list)) + bytes(0) for _ in range(len(self.column_type_deduplicated_list)) ] - self.__value = [None for _ in range(len(self.__column_type_deduplicated_list))] + self.value = [None for _ in range(len(self.column_type_deduplicated_list))] self.__query_data_set = query_data_set self.__is_closed = False self.__empty_resultSet = False - self.__has_cached_record = False + self.has_cached_record = False self.__rows_index = 0 - self.__is_null_info = [ - False for _ in range(len(self.__column_type_deduplicated_list)) + self.is_null_info = [ + False for _ in range(len(self.column_type_deduplicated_list)) ] def close(self): @@ -174,28 +171,24 @@ class IoTDBRpcDataSet(object): ) if time_array.dtype.byteorder == ">": time_array = time_array.byteswap().newbyteorder("<") - if ( - self.get_ignore_timestamp() is None - or self.get_ignore_timestamp() is False - ): + if self.ignore_timestamp is None or self.ignore_timestamp is False: result[IoTDBRpcDataSet.TIMESTAMP_STR].append(time_array) self.__query_data_set.time = [] total_length = len(time_array) for i in range(len(self.__query_data_set.bitmapList)): - if self.get_ignore_timestamp() is True: + if self.ignore_timestamp is True: column_name = self.get_column_names()[i] else: column_name = self.get_column_names()[i + 1] location = ( - self.__column_ordinal_dict[column_name] - - IoTDBRpcDataSet.START_INDEX + self.column_ordinal_dict[column_name] - IoTDBRpcDataSet.START_INDEX ) if location < 0: continue - data_type = self.__column_type_deduplicated_list[location] + data_type = self.column_type_deduplicated_list[location] value_buffer = self.__query_data_set.valueList[location] value_buffer_len = len(value_buffer) @@ -286,40 +279,39 @@ class IoTDBRpcDataSet(object): def construct_one_row(self): # simulating buffer, read 8 bytes from data set and discard first 8 bytes which have been read. - self.__time_bytes = self.__query_data_set.time[:8] + self.time_bytes = self.__query_data_set.time[:8] self.__query_data_set.time = self.__query_data_set.time[8:] for i, bitmap_buffer in enumerate(self.__query_data_set.bitmapList): + bitmap = self.__current_bitmap[i] # another 8 new rows, should move the bitmap buffer position to next byte if self.__rows_index % 8 == 0: - self.__current_bitmap[i] = bitmap_buffer[0] + bitmap = self.__current_bitmap[i] = bitmap_buffer[0] self.__query_data_set.bitmapList[i] = bitmap_buffer[1:] - if not self.is_null(self.__current_bitmap[i], self.__rows_index): - self.__is_null_info[i] = False + is_null = shift_table[self.__rows_index % 8] & bitmap == 0 + self.is_null_info[i] = is_null + if not is_null: value_buffer = self.__query_data_set.valueList[i] - data_type = self.__column_type_deduplicated_list[i] - + data_type = self.column_type_deduplicated_list[i] # simulating buffer if data_type == 0: - self.__value[i] = value_buffer[:1] + self.value[i] = value_buffer[:1] self.__query_data_set.valueList[i] = value_buffer[1:] elif data_type == 1 or data_type == 3: - self.__value[i] = value_buffer[:4] + self.value[i] = value_buffer[:4] self.__query_data_set.valueList[i] = value_buffer[4:] elif data_type == 2 or data_type == 4: - self.__value[i] = value_buffer[:8] + self.value[i] = value_buffer[:8] self.__query_data_set.valueList[i] = value_buffer[8:] elif data_type == 5: length = int.from_bytes( value_buffer[:4], byteorder="big", signed=False ) - self.__value[i] = value_buffer[4 : 4 + length] + self.value[i] = value_buffer[4 : 4 + length] self.__query_data_set.valueList[i] = value_buffer[4 + length :] else: raise RuntimeError("unsupported data type {}.".format(data_type)) - else: - self.__is_null_info[i] = True self.__rows_index += 1 - self.__has_cached_record = True + self.has_cached_record = True def fetch_results(self): self.__rows_index = 0 @@ -343,42 +335,12 @@ class IoTDBRpcDataSet(object): "Cannot fetch result from server, because of network connection: ", e ) - @staticmethod - def is_null(bitmap, row_num): - shift = shift_table[row_num % 8] - return shift & bitmap == 0 - - def is_null_by_index(self, column_index): - index = ( - self.__column_ordinal_dict[self.find_column_name_by_index(column_index)] - - IoTDBRpcDataSet.START_INDEX - ) - # time column will never be None - if index < 0: - return True - return self.is_null(self.__current_bitmap[index], self.__rows_index - 1) - - def is_null_by_name(self, column_name): - index = self.__column_ordinal_dict[column_name] - IoTDBRpcDataSet.START_INDEX - # time column will never be None - if index < 0: - return True - return self.is_null(self.__current_bitmap[index], self.__rows_index - 1) - - def is_null_by_location(self, location): - # time column will never be None - if location < 0: - return True - return self.__is_null_info[location] - def find_column_name_by_index(self, column_index): if column_index <= 0: raise Exception("Column index should start from 1") if column_index > len(self.__column_name_list): raise Exception( - "column index {} out of range {}".format( - column_index, self.__column_size - ) + "column index {} out of range {}".format(column_index, self.column_size) ) return self.__column_name_list[column_index - 1] @@ -393,24 +355,3 @@ class IoTDBRpcDataSet(object): def get_column_types(self): return self.__column_type_list - - def get_column_size(self): - return self.__column_size - - def get_ignore_timestamp(self): - return self.__ignore_timestamp - - def get_column_ordinal_dict(self): - return self.__column_ordinal_dict - - def get_column_type_deduplicated_list(self): - return self.__column_type_deduplicated_list - - def get_values(self): - return self.__value - - def get_time_bytes(self): - return self.__time_bytes - - def get_has_cached_record(self): - return self.__has_cached_record diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py index 162755a3767..ea2c9cc6ebc 100644 --- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py @@ -57,14 +57,23 @@ class SessionDataSet(object): query_data_set, 1024, ) - self.column_size = self.iotdb_rpc_data_set.get_column_size() - self.is_ignore_timestamp = self.iotdb_rpc_data_set.get_ignore_timestamp() + self.column_size = self.iotdb_rpc_data_set.column_size + self.is_ignore_timestamp = self.iotdb_rpc_data_set.ignore_timestamp self.column_names = tuple(self.iotdb_rpc_data_set.get_column_names()) - self.column_ordinal_dict = self.iotdb_rpc_data_set.get_column_ordinal_dict() + self.column_ordinal_dict = self.iotdb_rpc_data_set.column_ordinal_dict self.column_type_deduplicated_list = tuple( - self.iotdb_rpc_data_set.get_column_type_deduplicated_list() + self.iotdb_rpc_data_set.column_type_deduplicated_list ) - self.__field_list = [Field(data_type) for data_type in self.column_type_deduplicated_list] + if self.is_ignore_timestamp: + self.__field_list = [ + Field(data_type) + for data_type in self.iotdb_rpc_data_set.get_column_types() + ] + else: + self.__field_list = [ + Field(data_type) + for data_type in self.iotdb_rpc_data_set.get_column_types()[1:] + ] def __enter__(self): return self @@ -88,7 +97,7 @@ class SessionDataSet(object): return self.iotdb_rpc_data_set.next() def next(self): - if not self.iotdb_rpc_data_set.get_has_cached_record(): + if not self.iotdb_rpc_data_set.has_cached_record: if not self.has_next(): return None self.iotdb_rpc_data_set.has_cached_record = False @@ -96,41 +105,41 @@ class SessionDataSet(object): def construct_row_record_from_value_array(self): if self.is_ignore_timestamp: - start = 0 - end = self.column_size + offset = 0 else: - start = 1 - end = self.column_size + 1 - for index in range(start, end): - column_name = self.column_names[index] + offset = 1 + for index in range(self.column_size): + column_name = self.column_names[index + offset] + # IoTDBRpcDataSet.START_INDEX = 2 location = self.column_ordinal_dict[column_name] - 2 - if not self.iotdb_rpc_data_set.is_null_by_location(location): - value_bytes = self.iotdb_rpc_data_set.get_values()[location] + if not self.iotdb_rpc_data_set.is_null_info[location]: + value_bytes = self.iotdb_rpc_data_set.value[location] data_type = self.column_type_deduplicated_list[location] if data_type == 0: value = struct.unpack(">?", value_bytes)[0] - self.__field_list[index - 1].set_value(value) + self.__field_list[index].value = value elif data_type == 1: value = struct.unpack(">i", value_bytes)[0] - self.__field_list[index - 1].set_value(value) + self.__field_list[index].value = value elif data_type == 2: value = struct.unpack(">q", value_bytes)[0] - self.__field_list[index - 1].set_value(value) + self.__field_list[index].value = value elif data_type == 3: value = struct.unpack(">f", value_bytes)[0] - self.__field_list[index - 1].set_value(value) + self.__field_list[index].value = value elif data_type == 4: value = struct.unpack(">d", value_bytes)[0] - self.__field_list[index - 1].set_value(value) + self.__field_list[index].value = value elif data_type == 5: - self.__field_list[index - 1].set_value(value_bytes) + self.__field_list[index].value = value_bytes else: raise RuntimeError("unsupported data type {}.".format(data_type)) else: - self.__field_list[index - 1].set_value(None) + self.__field_list[index].value = None return RowRecord( - struct.unpack(">q", self.iotdb_rpc_data_set.get_time_bytes())[0], self.__field_list + struct.unpack(">q", self.iotdb_rpc_data_set.time_bytes)[0], + self.__field_list, ) def close_operation_handle(self):
