This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/pythonClient in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f840f0dc5937313aec363424331ce4aaff965b2d Author: JackieTien97 <[email protected]> AuthorDate: Sat Jan 17 22:10:13 2026 +0800 Support stream DataFrame interface in iotdb python client --- .../client-py/iotdb/utils/SessionDataSet.py | 19 ++++++++ .../client-py/iotdb/utils/iotdb_rpc_dataset.py | 53 ++++++++++++++++++++++ iotdb-client/client-py/session_example.py | 6 +++ .../client-py/table_model_session_example.py | 4 ++ 4 files changed, 82 insertions(+) diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py index b079ee28c1a..67509d3c059 100644 --- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py @@ -143,6 +143,25 @@ class SessionDataSet(object): def close_operation_handle(self): self.iotdb_rpc_data_set.close() + def has_next_df(self): + """ + Evaluate if there are more DataFrames to be fetched. + :return: whether there are more DataFrames to be fetched + """ + # Check if buffer has data or if there are more results to fetch + rpc_ds = self.iotdb_rpc_data_set + has_buffer = rpc_ds._IoTDBRpcDataSet__df_buffer is not None and len(rpc_ds._IoTDBRpcDataSet__df_buffer) > 0 + return has_buffer or rpc_ds._has_next_result_set() + + def next_df(self): + """ + Get the next DataFrame from the result set. + Each returned DataFrame contains exactly fetch_size rows, + except for the last DataFrame which may contain fewer rows. + :return: the next DataFrame, or None if no more data + """ + return self.iotdb_rpc_data_set.next_dataframe() + def todf(self) -> pd.DataFrame: return result_set_to_pandas(self) diff --git a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py index dc771368669..5afa8b152a5 100644 --- a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py +++ b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py @@ -120,6 +120,7 @@ class IoTDBRpcDataSet(object): self.data_frame = None self.__zone_id = zone_id self.__time_precision = time_precision + self.__df_buffer = None # Buffer for streaming DataFrames def close(self): if self.__is_closed: @@ -243,11 +244,60 @@ class IoTDBRpcDataSet(object): return True return False + def next_dataframe(self): + """ + Get the next DataFrame from the result set with exactly fetch_size rows. + The last DataFrame may have fewer rows. + :return: the next DataFrame with fetch_size rows, or None if no more data + """ + # Accumulate data until we have at least fetch_size rows or no more data + while True: + buffer_len = 0 if self.__df_buffer is None else len(self.__df_buffer) + if buffer_len >= self.__fetch_size: + # We have enough rows, return a chunk + break + if not self._has_next_result_set(): + # No more data to fetch + break + # Process and accumulate + result = self._process_buffer() + new_df = self._build_dataframe(result) + if self.__df_buffer is None: + self.__df_buffer = new_df + else: + self.__df_buffer = pd.concat([self.__df_buffer, new_df], ignore_index=True) + + if self.__df_buffer is None or len(self.__df_buffer) == 0: + return None + + if len(self.__df_buffer) <= self.__fetch_size: + # Return all remaining rows + result_df = self.__df_buffer + self.__df_buffer = None + return result_df + else: + # Slice off fetch_size rows + result_df = self.__df_buffer.iloc[:self.__fetch_size].reset_index(drop=True) + self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size:].reset_index(drop=True) + return result_df + def result_set_to_pandas(self): result = {} for i in range(len(self.__column_index_2_tsblock_column_index_list)): result[i] = [] while self._has_next_result_set(): + batch_result = self._process_buffer() + for k, v in batch_result.items(): + result[k].extend(v) + + return self._build_dataframe(result) + + def _process_buffer(self): + result = {} + for i in range(len(self.__column_index_2_tsblock_column_index_list)): + result[i] = [] + + while self.__query_result_index < len(self.__query_result): time_array, column_arrays, null_indicators, array_length = deserialize( memoryview(self.__query_result[self.__query_result_index]) ) @@ -339,6 +389,9 @@ class IoTDBRpcDataSet(object): result[i].append(data_array) + return result + + def _build_dataframe(self, result): for k, v in result.items(): if v is None or len(v) < 1 or v[0] is None: result[k] = [] diff --git a/iotdb-client/client-py/session_example.py b/iotdb-client/client-py/session_example.py index d0a6a3aba8e..996339c0088 100644 --- a/iotdb-client/client-py/session_example.py +++ b/iotdb-client/client-py/session_example.py @@ -411,6 +411,12 @@ with session.execute_query_statement( df = dataset.todf() print(df.to_string()) +with session.execute_query_statement( + "select s_01,s_02,s_03,s_04 from root.sg_test_01.d_04" +) as dataset: + while dataset.has_next_df(): + print(dataset.next_df()) + # delete database session.delete_storage_group("root.sg_test_01") diff --git a/iotdb-client/client-py/table_model_session_example.py b/iotdb-client/client-py/table_model_session_example.py index c9aa62b97a0..048f1022b76 100644 --- a/iotdb-client/client-py/table_model_session_example.py +++ b/iotdb-client/client-py/table_model_session_example.py @@ -158,5 +158,9 @@ with session.execute_query_statement("select * from table5 order by time") as da df = dataset.todf() print(df) +with session.execute_query_statement("select * from table5 order by time") as dataset: + while dataset.has_next_df(): + print(dataset.next_df()) + # close session connection. session.close()
