This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch query_v2_py
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/query_v2_py by this push:
     new 6c2fd00c9ce dev more
6c2fd00c9ce is described below

commit 6c2fd00c9ced8f5ddd1ed2e299a8d6b2094f4639
Author: HTHou <[email protected]>
AuthorDate: Thu Mar 20 23:53:29 2025 +0800

    dev more
---
 iotdb-client/client-py/iotdb/Session.py            |  24 ++++-
 .../client-py/iotdb/utils/IoTDBRpcDataSet.py       | 106 +++++++++++----------
 .../client-py/iotdb/utils/SessionDataSet.py        |  20 ++--
 3 files changed, 84 insertions(+), 66 deletions(-)

diff --git a/iotdb-client/client-py/iotdb/Session.py 
b/iotdb-client/client-py/iotdb/Session.py
index 2796c417d47..dd49958f3a8 100644
--- a/iotdb-client/client-py/iotdb/Session.py
+++ b/iotdb-client/client-py/iotdb/Session.py
@@ -1502,11 +1502,15 @@ class Session(object):
             resp.dataTypeList,
             resp.columnNameIndexMap,
             resp.queryId,
+            self.__session_id,
             self.__client,
             self.__statement_id,
-            self.__session_id,
             resp.queryResult,
             resp.ignoreTimeStamp,
+            timeout,
+            resp.moreData,
+            self.__fetch_size,
+            resp.columnIndex2TsBlockColumnIndexList,
         )
 
     def execute_non_query_statement(self, sql):
@@ -1567,11 +1571,15 @@ class Session(object):
                 resp.dataTypeList,
                 resp.columnNameIndexMap,
                 resp.queryId,
+                self.__session_id,
                 self.__client,
                 self.__statement_id,
-                self.__session_id,
                 resp.queryResult,
                 resp.ignoreTimeStamp,
+                timeout,
+                resp.moreData,
+                self.__fetch_size,
+                resp.columnIndex2TsBlockColumnIndexList,
             )
         else:
             return None
@@ -1779,11 +1787,15 @@ class Session(object):
             resp.dataTypeList,
             resp.columnNameIndexMap,
             resp.queryId,
+            self.__session_id,
             self.__client,
             self.__statement_id,
-            self.__session_id,
             resp.queryResult,
             resp.ignoreTimeStamp,
+            0,
+            resp.moreData,
+            self.__fetch_size,
+            resp.columnIndex2TsBlockColumnIndexList,
         )
 
     def execute_last_data_query(self, paths: list, last_time: int) -> 
SessionDataSet:
@@ -1820,11 +1832,15 @@ class Session(object):
             resp.dataTypeList,
             resp.columnNameIndexMap,
             resp.queryId,
-            self.__client,
             self.__statement_id,
+            self.__client,
             self.__session_id,
             resp.queryResult,
             resp.ignoreTimeStamp,
+            0,
+            resp.moreData,
+            self.__fetch_size,
+            resp.columnIndex2TsBlockColumnIndexList,
         )
 
     def insert_string_records_of_one_device(
diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py 
b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
index c9e45304707..d91d0b4d7a3 100644
--- a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
+++ b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
@@ -17,12 +17,10 @@
 #
 
 # for package
-import binascii
 import logging
 
 import numpy as np
 import pandas as pd
-from datetime import date
 from thrift.transport import TTransport
 from iotdb.thrift.rpc.IClientRPCService import TSFetchResultsReq, 
TSCloseOperationReq
 from iotdb.tsfile.utils.date_utils import parse_int_to_date
@@ -31,16 +29,10 @@ from iotdb.utils.IoTDBConnectionException import 
IoTDBConnectionException
 from iotdb.utils.IoTDBConstants import TSDataType
 
 logger = logging.getLogger("IoTDB")
-
-
-def _to_bitbuffer(b):
-    return bytes("{:0{}b}".format(int(binascii.hexlify(b), 16), 8 * len(b)), 
"utf-8")
+TIMESTAMP_STR = "Time"
 
 
 class IoTDBRpcDataSet(object):
-    TIMESTAMP_STR = "Time"
-    # VALUE_IS_NULL = "The value got by %s (column name) is NULL."
-    START_INDEX = 2
 
     def __init__(
         self,
@@ -49,12 +41,15 @@ class IoTDBRpcDataSet(object):
         column_type_list,
         column_name_index,
         ignore_timestamp,
+        more_data,
         query_id,
         client,
         statement_id,
         session_id,
         query_result,
         fetch_size,
+        time_out,
+        column_index_2_tsblock_column_index_list,
     ):
         self.__statement_id = statement_id
         self.__session_id = session_id
@@ -64,46 +59,56 @@ class IoTDBRpcDataSet(object):
         self.__client = client
         self.__fetch_size = fetch_size
         self.column_size = len(column_name_list)
-        self.__default_time_out = 1000
+        self.__time_out = time_out
 
         self.__column_name_list = []
         self.__column_type_list = []
         self.column_ordinal_dict = {}
+        self.column_name_2_tsblock_column_index_dict = {}
+        column_start_index = 1
+
+        start_index_for_column_index_2_tsblock_column_index_list = 0
         if not ignore_timestamp:
-            self.__column_name_list.append(IoTDBRpcDataSet.TIMESTAMP_STR)
+            self.__column_name_list.append(TIMESTAMP_STR)
             self.__column_type_list.append(TSDataType.INT64)
-            self.column_ordinal_dict[IoTDBRpcDataSet.TIMESTAMP_STR] = 1
-
-        if column_name_index is not None:
-            self.column_type_deduplicated_list = [
-                None for _ in range(len(column_name_index))
-            ]
-            for i in range(self.column_size):
-                name = column_name_list[i]
-                self.__column_name_list.append(name)
-                self.__column_type_list.append(TSDataType[column_type_list[i]])
-                if name not in self.column_ordinal_dict:
-                    index = column_name_index[name]
-                    self.column_ordinal_dict[name] = index + 
IoTDBRpcDataSet.START_INDEX
-                    self.column_type_deduplicated_list[index] = TSDataType[
-                        column_type_list[i]
-                    ]
-        else:
-            index = IoTDBRpcDataSet.START_INDEX
-            self.column_type_deduplicated_list = []
+            self.column_name_2_tsblock_column_index_dict[TIMESTAMP_STR] = -1
+            self.column_ordinal_dict[TIMESTAMP_STR] = 1
+            if column_index_2_tsblock_column_index_list is not None:
+                column_index_2_tsblock_column_index_list.insert(0, -1)
+                start_index_for_column_index_2_tsblock_column_index_list = 1
+            column_start_index += 1
+
+        if column_index_2_tsblock_column_index_list is None:
+            column_index_2_tsblock_column_index_list = []
+            if not ignore_timestamp:
+                start_index_for_column_index_2_tsblock_column_index_list = 1
+                column_index_2_tsblock_column_index_list.append(-1)
             for i in range(len(column_name_list)):
-                name = column_name_list[i]
-                self.__column_name_list.append(name)
-                self.__column_type_list.append(TSDataType[column_type_list[i]])
-                if name not in self.column_ordinal_dict:
-                    self.column_ordinal_dict[name] = index
-                    index += 1
-                    self.column_type_deduplicated_list.append(
-                        TSDataType[column_type_list[i]]
-                    )
+                column_index_2_tsblock_column_index_list.append(i)
+        ts_block_column_size = (
+            max(column_index_2_tsblock_column_index_list, default=0) + 1
+        )
+        self.__data_type_for_tsblock_column = [None] * ts_block_column_size
+        for i in range(len(column_name_list)):
+            name = column_name_list[i]
+            column_type = TSDataType[column_type_list[i]]
+            self.__column_name_list.append(name)
+            self.__column_type_list.append(column_type)
+            tsblock_column_index = column_index_2_tsblock_column_index_list[
+                start_index_for_column_index_2_tsblock_column_index_list + i
+            ]
+            if tsblock_column_index != -1:
+                self.__data_type_for_tsblock_column[tsblock_column_index] = 
column_type
+            if name not in self.column_name_2_tsblock_column_index_dict:
+                self.column_ordinal_dict[name] = i + column_start_index
+                self.column_name_2_tsblock_column_index_dict[name] = (
+                    tsblock_column_index
+                )
+
+        self.__column_index_2_tsblock_column_index_list = (
+            column_index_2_tsblock_column_index_list
+        )
         self.__query_result = query_result
-        if query_result is not None:
-            self.__query_result_size = len(query_result)
         self.__query_result_index = 0
         self.__is_closed = False
         self.__empty_resultSet = False
@@ -127,7 +132,7 @@ class IoTDBRpcDataSet(object):
                     )
                 )
             except TTransport.TException as e:
-                raise RuntimeError(
+                raise IoTDBConnectionException(
                     "close session {} failed because: 
".format(self.__session_id), e
                 )
 
@@ -172,12 +177,10 @@ class IoTDBRpcDataSet(object):
             else:
                 column_name = self.__column_name_list[i + 1]
 
-            location = (
-                self.column_ordinal_dict[column_name] - 
IoTDBRpcDataSet.START_INDEX
-            )
+            location = 
self.column_name_2_tsblock_column_index_dict[column_name]
             if location < 0:
                 continue
-            data_type = self.column_type_deduplicated_list[location]
+            data_type = self.__data_type_for_tsblock_column[location]
             value_buffer = column_values[location]
             value_buffer_len = len(value_buffer)
             # DOUBLE
@@ -279,7 +282,7 @@ class IoTDBRpcDataSet(object):
                     time_array.dtype.newbyteorder("<")
                 )
             if self.ignore_timestamp is None or self.ignore_timestamp is False:
-                result[IoTDBRpcDataSet.TIMESTAMP_STR].append(time_array)
+                result[TIMESTAMP_STR].append(time_array)
 
             total_length = len(time_array)
 
@@ -289,12 +292,10 @@ class IoTDBRpcDataSet(object):
                 else:
                     column_name = self.__column_name_list[i + 1]
 
-                location = (
-                    self.column_ordinal_dict[column_name] - 
IoTDBRpcDataSet.START_INDEX
-                )
+                location = 
self.column_name_2_tsblock_column_index_dict[column_name]
                 if location < 0:
                     continue
-                data_type = self.column_type_deduplicated_list[location]
+                data_type = self.__data_type_for_tsblock_column[location]
                 value_buffer = column_values[location]
                 value_buffer_len = len(value_buffer)
                 # DOUBLE
@@ -420,7 +421,7 @@ class IoTDBRpcDataSet(object):
             self.__fetch_size,
             self.__query_id,
             True,
-            self.__default_time_out,
+            self.__time_out,
         )
         try:
             resp = self.__client.fetchResultsV2(request)
@@ -428,6 +429,7 @@ class IoTDBRpcDataSet(object):
                 self.__empty_resultSet = True
             else:
                 self.__query_result = resp.queryResult
+                self.__query_result_index = 0
             return resp.hasResultSet
         except TTransport.TException as e:
             raise RuntimeError(
diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py 
b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
index 50dc123aa57..5ef69118330 100644
--- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
+++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
@@ -37,11 +37,15 @@ class SessionDataSet(object):
         column_type_list,
         column_name_index,
         query_id,
-        client,
         statement_id,
+        client,
         session_id,
         query_result,
         ignore_timestamp,
+        time_out,
+        more_data,
+        fetch_size,
+        column_index_2_tsblock_column_index_list,
     ):
         self.iotdb_rpc_data_set = IoTDBRpcDataSet(
             sql,
@@ -49,21 +53,17 @@ class SessionDataSet(object):
             column_type_list,
             column_name_index,
             ignore_timestamp,
+            more_data,
             query_id,
             client,
             statement_id,
             session_id,
             query_result,
-            5000,
-        )
-        self.column_size = self.iotdb_rpc_data_set.column_size
-        self.is_ignore_timestamp = self.iotdb_rpc_data_set.ignore_timestamp
-        self.column_names = tuple(self.iotdb_rpc_data_set.get_column_names())
-        self.column_ordinal_dict = self.iotdb_rpc_data_set.column_ordinal_dict
-        self.column_type_deduplicated_list = tuple(
-            self.iotdb_rpc_data_set.column_type_deduplicated_list
+            fetch_size,
+            time_out,
+            column_index_2_tsblock_column_index_list,
         )
-        if self.is_ignore_timestamp:
+        if ignore_timestamp:
             self.__field_list = [
                 Field(data_type)
                 for data_type in self.iotdb_rpc_data_set.get_column_types()

Reply via email to