This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/pythonClient
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f840f0dc5937313aec363424331ce4aaff965b2d
Author: JackieTien97 <[email protected]>
AuthorDate: Sat Jan 17 22:10:13 2026 +0800

    Support stream DataFrame interface in iotdb python client
---
 .../client-py/iotdb/utils/SessionDataSet.py        | 19 ++++++++
 .../client-py/iotdb/utils/iotdb_rpc_dataset.py     | 53 ++++++++++++++++++++++
 iotdb-client/client-py/session_example.py          |  6 +++
 .../client-py/table_model_session_example.py       |  4 ++
 4 files changed, 82 insertions(+)

diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py 
b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
index b079ee28c1a..67509d3c059 100644
--- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
+++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
@@ -143,6 +143,25 @@ class SessionDataSet(object):
     def close_operation_handle(self):
         self.iotdb_rpc_data_set.close()
 
+    def has_next_df(self):
+        """
+        Evaluate if there are more DataFrames to be fetched.
+        :return: whether there are more DataFrames to be fetched
+        """
+        # Check if buffer has data or if there are more results to fetch
+        rpc_ds = self.iotdb_rpc_data_set
+        has_buffer = rpc_ds._IoTDBRpcDataSet__df_buffer is not None and 
len(rpc_ds._IoTDBRpcDataSet__df_buffer) > 0
+        return has_buffer or rpc_ds._has_next_result_set()
+
+    def next_df(self):
+        """
+        Get the next DataFrame from the result set.
+        Each returned DataFrame contains exactly fetch_size rows,
+        except for the last DataFrame which may contain fewer rows.
+        :return: the next DataFrame, or None if no more data
+        """
+        return self.iotdb_rpc_data_set.next_dataframe()
+
     def todf(self) -> pd.DataFrame:
         return result_set_to_pandas(self)
 
diff --git a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py 
b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
index dc771368669..5afa8b152a5 100644
--- a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
+++ b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
@@ -120,6 +120,7 @@ class IoTDBRpcDataSet(object):
         self.data_frame = None
         self.__zone_id = zone_id
         self.__time_precision = time_precision
+        self.__df_buffer = None  # Buffer for streaming DataFrames
 
     def close(self):
         if self.__is_closed:
@@ -243,11 +244,60 @@ class IoTDBRpcDataSet(object):
             return True
         return False
 
+    def next_dataframe(self):
+        """
+        Get the next DataFrame from the result set with exactly fetch_size 
rows.
+        The last DataFrame may have fewer rows.
+        :return: the next DataFrame with fetch_size rows, or None if no more 
data
+        """
+        # Accumulate data until we have at least fetch_size rows or no more 
data
+        while True:
+            buffer_len = 0 if self.__df_buffer is None else 
len(self.__df_buffer)
+            if buffer_len >= self.__fetch_size:
+                # We have enough rows, return a chunk
+                break
+            if not self._has_next_result_set():
+                # No more data to fetch
+                break
+            # Process and accumulate
+            result = self._process_buffer()
+            new_df = self._build_dataframe(result)
+            if self.__df_buffer is None:
+                self.__df_buffer = new_df
+            else:
+                self.__df_buffer = pd.concat([self.__df_buffer, new_df], 
ignore_index=True)
+
+        if self.__df_buffer is None or len(self.__df_buffer) == 0:
+            return None
+
+        if len(self.__df_buffer) <= self.__fetch_size:
+            # Return all remaining rows
+            result_df = self.__df_buffer
+            self.__df_buffer = None
+            return result_df
+        else:
+            # Slice off fetch_size rows
+            result_df = 
self.__df_buffer.iloc[:self.__fetch_size].reset_index(drop=True)
+            self.__df_buffer = 
self.__df_buffer.iloc[self.__fetch_size:].reset_index(drop=True)
+            return result_df
+
     def result_set_to_pandas(self):
         result = {}
         for i in range(len(self.__column_index_2_tsblock_column_index_list)):
             result[i] = []
         while self._has_next_result_set():
+            batch_result = self._process_buffer()
+            for k, v in batch_result.items():
+                result[k].extend(v)
+
+        return self._build_dataframe(result)
+
+    def _process_buffer(self):
+        result = {}
+        for i in range(len(self.__column_index_2_tsblock_column_index_list)):
+            result[i] = []
+
+        while self.__query_result_index < len(self.__query_result):
             time_array, column_arrays, null_indicators, array_length = 
deserialize(
                 memoryview(self.__query_result[self.__query_result_index])
             )
@@ -339,6 +389,9 @@ class IoTDBRpcDataSet(object):
 
                 result[i].append(data_array)
 
+        return result
+
+    def _build_dataframe(self, result):
         for k, v in result.items():
             if v is None or len(v) < 1 or v[0] is None:
                 result[k] = []
diff --git a/iotdb-client/client-py/session_example.py 
b/iotdb-client/client-py/session_example.py
index d0a6a3aba8e..996339c0088 100644
--- a/iotdb-client/client-py/session_example.py
+++ b/iotdb-client/client-py/session_example.py
@@ -411,6 +411,12 @@ with session.execute_query_statement(
     df = dataset.todf()
     print(df.to_string())
 
+with session.execute_query_statement(
+    "select s_01,s_02,s_03,s_04 from root.sg_test_01.d_04"
+) as dataset:
+    while dataset.has_next_df():
+        print(dataset.next_df())
+
 # delete database
 session.delete_storage_group("root.sg_test_01")
 
diff --git a/iotdb-client/client-py/table_model_session_example.py 
b/iotdb-client/client-py/table_model_session_example.py
index c9aa62b97a0..048f1022b76 100644
--- a/iotdb-client/client-py/table_model_session_example.py
+++ b/iotdb-client/client-py/table_model_session_example.py
@@ -158,5 +158,9 @@ with session.execute_query_statement("select * from table5 
order by time") as da
     df = dataset.todf()
     print(df)
 
+with session.execute_query_statement("select * from table5 order by time") as 
dataset:
+    while dataset.has_next_df():
+        print(dataset.next_df())
+
 # close session connection.
 session.close()

Reply via email to