This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch query_v2_py
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/query_v2_py by this push:
new 6c2fd00c9ce dev more
6c2fd00c9ce is described below
commit 6c2fd00c9ced8f5ddd1ed2e299a8d6b2094f4639
Author: HTHou <[email protected]>
AuthorDate: Thu Mar 20 23:53:29 2025 +0800
dev more
---
iotdb-client/client-py/iotdb/Session.py | 24 ++++-
.../client-py/iotdb/utils/IoTDBRpcDataSet.py | 106 +++++++++++----------
.../client-py/iotdb/utils/SessionDataSet.py | 20 ++--
3 files changed, 84 insertions(+), 66 deletions(-)
diff --git a/iotdb-client/client-py/iotdb/Session.py
b/iotdb-client/client-py/iotdb/Session.py
index 2796c417d47..dd49958f3a8 100644
--- a/iotdb-client/client-py/iotdb/Session.py
+++ b/iotdb-client/client-py/iotdb/Session.py
@@ -1502,11 +1502,15 @@ class Session(object):
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
+ self.__session_id,
self.__client,
self.__statement_id,
- self.__session_id,
resp.queryResult,
resp.ignoreTimeStamp,
+ timeout,
+ resp.moreData,
+ self.__fetch_size,
+ resp.columnIndex2TsBlockColumnIndexList,
)
def execute_non_query_statement(self, sql):
@@ -1567,11 +1571,15 @@ class Session(object):
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
+ self.__session_id,
self.__client,
self.__statement_id,
- self.__session_id,
resp.queryResult,
resp.ignoreTimeStamp,
+ timeout,
+ resp.moreData,
+ self.__fetch_size,
+ resp.columnIndex2TsBlockColumnIndexList,
)
else:
return None
@@ -1779,11 +1787,15 @@ class Session(object):
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
+ self.__session_id,
self.__client,
self.__statement_id,
- self.__session_id,
resp.queryResult,
resp.ignoreTimeStamp,
+ 0,
+ resp.moreData,
+ self.__fetch_size,
+ resp.columnIndex2TsBlockColumnIndexList,
)
def execute_last_data_query(self, paths: list, last_time: int) ->
SessionDataSet:
@@ -1820,11 +1832,15 @@ class Session(object):
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
- self.__client,
self.__statement_id,
+ self.__client,
self.__session_id,
resp.queryResult,
resp.ignoreTimeStamp,
+ 0,
+ resp.moreData,
+ self.__fetch_size,
+ resp.columnIndex2TsBlockColumnIndexList,
)
def insert_string_records_of_one_device(
diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
index c9e45304707..d91d0b4d7a3 100644
--- a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
+++ b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
@@ -17,12 +17,10 @@
#
# for package
-import binascii
import logging
import numpy as np
import pandas as pd
-from datetime import date
from thrift.transport import TTransport
from iotdb.thrift.rpc.IClientRPCService import TSFetchResultsReq,
TSCloseOperationReq
from iotdb.tsfile.utils.date_utils import parse_int_to_date
@@ -31,16 +29,10 @@ from iotdb.utils.IoTDBConnectionException import
IoTDBConnectionException
from iotdb.utils.IoTDBConstants import TSDataType
logger = logging.getLogger("IoTDB")
-
-
-def _to_bitbuffer(b):
- return bytes("{:0{}b}".format(int(binascii.hexlify(b), 16), 8 * len(b)),
"utf-8")
+TIMESTAMP_STR = "Time"
class IoTDBRpcDataSet(object):
- TIMESTAMP_STR = "Time"
- # VALUE_IS_NULL = "The value got by %s (column name) is NULL."
- START_INDEX = 2
def __init__(
self,
@@ -49,12 +41,15 @@ class IoTDBRpcDataSet(object):
column_type_list,
column_name_index,
ignore_timestamp,
+ more_data,
query_id,
client,
statement_id,
session_id,
query_result,
fetch_size,
+ time_out,
+ column_index_2_tsblock_column_index_list,
):
self.__statement_id = statement_id
self.__session_id = session_id
@@ -64,46 +59,56 @@ class IoTDBRpcDataSet(object):
self.__client = client
self.__fetch_size = fetch_size
self.column_size = len(column_name_list)
- self.__default_time_out = 1000
+ self.__time_out = time_out
self.__column_name_list = []
self.__column_type_list = []
self.column_ordinal_dict = {}
+ self.column_name_2_tsblock_column_index_dict = {}
+ column_start_index = 1
+
+ start_index_for_column_index_2_tsblock_column_index_list = 0
if not ignore_timestamp:
- self.__column_name_list.append(IoTDBRpcDataSet.TIMESTAMP_STR)
+ self.__column_name_list.append(TIMESTAMP_STR)
self.__column_type_list.append(TSDataType.INT64)
- self.column_ordinal_dict[IoTDBRpcDataSet.TIMESTAMP_STR] = 1
-
- if column_name_index is not None:
- self.column_type_deduplicated_list = [
- None for _ in range(len(column_name_index))
- ]
- for i in range(self.column_size):
- name = column_name_list[i]
- self.__column_name_list.append(name)
- self.__column_type_list.append(TSDataType[column_type_list[i]])
- if name not in self.column_ordinal_dict:
- index = column_name_index[name]
- self.column_ordinal_dict[name] = index +
IoTDBRpcDataSet.START_INDEX
- self.column_type_deduplicated_list[index] = TSDataType[
- column_type_list[i]
- ]
- else:
- index = IoTDBRpcDataSet.START_INDEX
- self.column_type_deduplicated_list = []
+ self.column_name_2_tsblock_column_index_dict[TIMESTAMP_STR] = -1
+ self.column_ordinal_dict[TIMESTAMP_STR] = 1
+ if column_index_2_tsblock_column_index_list is not None:
+ column_index_2_tsblock_column_index_list.insert(0, -1)
+ start_index_for_column_index_2_tsblock_column_index_list = 1
+ column_start_index += 1
+
+ if column_index_2_tsblock_column_index_list is None:
+ column_index_2_tsblock_column_index_list = []
+ if not ignore_timestamp:
+ start_index_for_column_index_2_tsblock_column_index_list = 1
+ column_index_2_tsblock_column_index_list.append(-1)
for i in range(len(column_name_list)):
- name = column_name_list[i]
- self.__column_name_list.append(name)
- self.__column_type_list.append(TSDataType[column_type_list[i]])
- if name not in self.column_ordinal_dict:
- self.column_ordinal_dict[name] = index
- index += 1
- self.column_type_deduplicated_list.append(
- TSDataType[column_type_list[i]]
- )
+ column_index_2_tsblock_column_index_list.append(i)
+ ts_block_column_size = (
+ max(column_index_2_tsblock_column_index_list, default=0) + 1
+ )
+ self.__data_type_for_tsblock_column = [None] * ts_block_column_size
+ for i in range(len(column_name_list)):
+ name = column_name_list[i]
+ column_type = TSDataType[column_type_list[i]]
+ self.__column_name_list.append(name)
+ self.__column_type_list.append(column_type)
+ tsblock_column_index = column_index_2_tsblock_column_index_list[
+ start_index_for_column_index_2_tsblock_column_index_list + i
+ ]
+ if tsblock_column_index != -1:
+ self.__data_type_for_tsblock_column[tsblock_column_index] =
column_type
+ if name not in self.column_name_2_tsblock_column_index_dict:
+ self.column_ordinal_dict[name] = i + column_start_index
+ self.column_name_2_tsblock_column_index_dict[name] = (
+ tsblock_column_index
+ )
+
+ self.__column_index_2_tsblock_column_index_list = (
+ column_index_2_tsblock_column_index_list
+ )
self.__query_result = query_result
- if query_result is not None:
- self.__query_result_size = len(query_result)
self.__query_result_index = 0
self.__is_closed = False
self.__empty_resultSet = False
@@ -127,7 +132,7 @@ class IoTDBRpcDataSet(object):
)
)
except TTransport.TException as e:
- raise RuntimeError(
+ raise IoTDBConnectionException(
"close session {} failed because:
".format(self.__session_id), e
)
@@ -172,12 +177,10 @@ class IoTDBRpcDataSet(object):
else:
column_name = self.__column_name_list[i + 1]
- location = (
- self.column_ordinal_dict[column_name] -
IoTDBRpcDataSet.START_INDEX
- )
+ location =
self.column_name_2_tsblock_column_index_dict[column_name]
if location < 0:
continue
- data_type = self.column_type_deduplicated_list[location]
+ data_type = self.__data_type_for_tsblock_column[location]
value_buffer = column_values[location]
value_buffer_len = len(value_buffer)
# DOUBLE
@@ -279,7 +282,7 @@ class IoTDBRpcDataSet(object):
time_array.dtype.newbyteorder("<")
)
if self.ignore_timestamp is None or self.ignore_timestamp is False:
- result[IoTDBRpcDataSet.TIMESTAMP_STR].append(time_array)
+ result[TIMESTAMP_STR].append(time_array)
total_length = len(time_array)
@@ -289,12 +292,10 @@ class IoTDBRpcDataSet(object):
else:
column_name = self.__column_name_list[i + 1]
- location = (
- self.column_ordinal_dict[column_name] -
IoTDBRpcDataSet.START_INDEX
- )
+ location =
self.column_name_2_tsblock_column_index_dict[column_name]
if location < 0:
continue
- data_type = self.column_type_deduplicated_list[location]
+ data_type = self.__data_type_for_tsblock_column[location]
value_buffer = column_values[location]
value_buffer_len = len(value_buffer)
# DOUBLE
@@ -420,7 +421,7 @@ class IoTDBRpcDataSet(object):
self.__fetch_size,
self.__query_id,
True,
- self.__default_time_out,
+ self.__time_out,
)
try:
resp = self.__client.fetchResultsV2(request)
@@ -428,6 +429,7 @@ class IoTDBRpcDataSet(object):
self.__empty_resultSet = True
else:
self.__query_result = resp.queryResult
+ self.__query_result_index = 0
return resp.hasResultSet
except TTransport.TException as e:
raise RuntimeError(
diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
index 50dc123aa57..5ef69118330 100644
--- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
+++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
@@ -37,11 +37,15 @@ class SessionDataSet(object):
column_type_list,
column_name_index,
query_id,
- client,
statement_id,
+ client,
session_id,
query_result,
ignore_timestamp,
+ time_out,
+ more_data,
+ fetch_size,
+ column_index_2_tsblock_column_index_list,
):
self.iotdb_rpc_data_set = IoTDBRpcDataSet(
sql,
@@ -49,21 +53,17 @@ class SessionDataSet(object):
column_type_list,
column_name_index,
ignore_timestamp,
+ more_data,
query_id,
client,
statement_id,
session_id,
query_result,
- 5000,
- )
- self.column_size = self.iotdb_rpc_data_set.column_size
- self.is_ignore_timestamp = self.iotdb_rpc_data_set.ignore_timestamp
- self.column_names = tuple(self.iotdb_rpc_data_set.get_column_names())
- self.column_ordinal_dict = self.iotdb_rpc_data_set.column_ordinal_dict
- self.column_type_deduplicated_list = tuple(
- self.iotdb_rpc_data_set.column_type_deduplicated_list
+ fetch_size,
+ time_out,
+ column_index_2_tsblock_column_index_list,
)
- if self.is_ignore_timestamp:
+ if ignore_timestamp:
self.__field_list = [
Field(data_type)
for data_type in self.iotdb_rpc_data_set.get_column_types()