This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e46872766b Support stream DataFrame interface in iotdb python client 
(#17035)
2e46872766b is described below

commit 2e46872766be071c5fe583414899f5fe5ff87500
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jan 27 10:41:28 2026 +0800

    Support stream DataFrame interface in iotdb python client (#17035)
    
    * Support stream DataFrame interface in iotdb python client
    
    * change according to code review
    
    * format code
    
    * Bug fix "optional"
    
    * add IT
    
    * fix illegal fetch_size
    
    ---------
    
    Co-authored-by: Yongzao <[email protected]>
    Co-authored-by: HTHou <[email protected]>
---
 iotdb-client/client-py/iotdb/Session.py            |  8 ++-
 .../client-py/iotdb/utils/SessionDataSet.py        | 19 ++++++
 .../client-py/iotdb/utils/iotdb_rpc_dataset.py     | 68 ++++++++++++++++++++++
 iotdb-client/client-py/session_example.py          |  6 ++
 .../client-py/table_model_session_example.py       |  4 ++
 .../client-py/tests/integration/test_dataframe.py  | 50 ++++++++++++++++
 6 files changed, 154 insertions(+), 1 deletion(-)

diff --git a/iotdb-client/client-py/iotdb/Session.py 
b/iotdb-client/client-py/iotdb/Session.py
index 3f44bd41584..a3d89d71746 100644
--- a/iotdb-client/client-py/iotdb/Session.py
+++ b/iotdb-client/client-py/iotdb/Session.py
@@ -95,7 +95,13 @@ class Session(object):
         self.__default_endpoint = TEndPoint(self.__host, self.__port)
         self.__user = user
         self.__password = password
-        self.__fetch_size = fetch_size
+        if fetch_size > 0:
+            self.__fetch_size = fetch_size
+        else:
+            logger.warning(
+                f"fetch_size {fetch_size} is illegal, use default fetch_size 
{self.DEFAULT_FETCH_SIZE}"
+            )
+            self.__fetch_size = self.DEFAULT_FETCH_SIZE
         self.__is_close = True
         self.__client = None
         self.__default_connection = None
diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py 
b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
index b079ee28c1a..9c5b3ec06dd 100644
--- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
+++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
@@ -16,6 +16,7 @@
 # under the License.
 #
 import logging
+from typing import Optional
 
 from iotdb.utils.Field import Field
 
@@ -143,6 +144,24 @@ class SessionDataSet(object):
     def close_operation_handle(self):
         self.iotdb_rpc_data_set.close()
 
+    def has_next_df(self) -> bool:
+        """
+        Evaluate if there are more DataFrames to be fetched.
+        :return: whether there are more DataFrames to be fetched
+        """
+        # Check if buffer has data or if there are more results to fetch
+        rpc_ds = self.iotdb_rpc_data_set
+        return rpc_ds._has_buffered_data() or rpc_ds._has_next_result_set()
+
+    def next_df(self) -> Optional[pd.DataFrame]:
+        """
+        Get the next DataFrame from the result set.
+        Each returned DataFrame contains exactly fetch_size rows,
+        except for the last DataFrame which may contain fewer rows.
+        :return: the next DataFrame, or None if no more data
+        """
+        return self.iotdb_rpc_data_set.next_dataframe()
+
     def todf(self) -> pd.DataFrame:
         return result_set_to_pandas(self)
 
diff --git a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py 
b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
index dc771368669..0edc76f68fd 100644
--- a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
+++ b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
@@ -18,6 +18,7 @@
 
 # for package
 import logging
+from typing import Optional
 
 import numpy as np
 import pandas as pd
@@ -120,10 +121,12 @@ class IoTDBRpcDataSet(object):
         self.data_frame = None
         self.__zone_id = zone_id
         self.__time_precision = time_precision
+        self.__df_buffer = None  # Buffer for streaming DataFrames
 
     def close(self):
         if self.__is_closed:
             return
+        self.__df_buffer = None  # Clean up streaming DataFrame buffer
         if self.__client is not None:
             try:
                 status = self.__client.closeOperation(
@@ -243,11 +246,73 @@ class IoTDBRpcDataSet(object):
             return True
         return False
 
+    def _has_buffered_data(self) -> bool:
+        """
+        Check if there is buffered data for streaming DataFrame interface.
+        :return: True if there is buffered data, False otherwise
+        """
+        return self.__df_buffer is not None and len(self.__df_buffer) > 0
+
+    def next_dataframe(self) -> Optional[pd.DataFrame]:
+        """
+        Get the next DataFrame from the result set with exactly fetch_size 
rows.
+        The last DataFrame may have fewer rows.
+        :return: the next DataFrame with fetch_size rows, or None if no more 
data
+        """
+        # Accumulate data until we have at least fetch_size rows or no more 
data
+        while True:
+            buffer_len = 0 if self.__df_buffer is None else 
len(self.__df_buffer)
+            if buffer_len >= self.__fetch_size:
+                # We have enough rows, return a chunk
+                break
+            if not self._has_next_result_set():
+                # No more data to fetch
+                break
+            # Process and accumulate
+            result = self._process_buffer()
+            new_df = self._build_dataframe(result)
+            if self.__df_buffer is None:
+                self.__df_buffer = new_df
+            else:
+                self.__df_buffer = pd.concat(
+                    [self.__df_buffer, new_df], ignore_index=True
+                )
+
+        if self.__df_buffer is None or len(self.__df_buffer) == 0:
+            return None
+
+        if len(self.__df_buffer) <= self.__fetch_size:
+            # Return all remaining rows
+            result_df = self.__df_buffer
+            self.__df_buffer = None
+            return result_df
+        else:
+            # Slice off fetch_size rows
+            result_df = self.__df_buffer.iloc[: self.__fetch_size].reset_index(
+                drop=True
+            )
+            self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size 
:].reset_index(
+                drop=True
+            )
+            return result_df
+
     def result_set_to_pandas(self):
         result = {}
         for i in range(len(self.__column_index_2_tsblock_column_index_list)):
             result[i] = []
         while self._has_next_result_set():
+            batch_result = self._process_buffer()
+            for k, v in batch_result.items():
+                result[k].extend(v)
+
+        return self._build_dataframe(result)
+
+    def _process_buffer(self):
+        result = {}
+        for i in range(len(self.__column_index_2_tsblock_column_index_list)):
+            result[i] = []
+
+        while self.__query_result_index < len(self.__query_result):
             time_array, column_arrays, null_indicators, array_length = 
deserialize(
                 memoryview(self.__query_result[self.__query_result_index])
             )
@@ -339,6 +404,9 @@ class IoTDBRpcDataSet(object):
 
                 result[i].append(data_array)
 
+        return result
+
+    def _build_dataframe(self, result):
         for k, v in result.items():
             if v is None or len(v) < 1 or v[0] is None:
                 result[k] = []
diff --git a/iotdb-client/client-py/session_example.py 
b/iotdb-client/client-py/session_example.py
index d0a6a3aba8e..996339c0088 100644
--- a/iotdb-client/client-py/session_example.py
+++ b/iotdb-client/client-py/session_example.py
@@ -411,6 +411,12 @@ with session.execute_query_statement(
     df = dataset.todf()
     print(df.to_string())
 
+with session.execute_query_statement(
+    "select s_01,s_02,s_03,s_04 from root.sg_test_01.d_04"
+) as dataset:
+    while dataset.has_next_df():
+        print(dataset.next_df())
+
 # delete database
 session.delete_storage_group("root.sg_test_01")
 
diff --git a/iotdb-client/client-py/table_model_session_example.py 
b/iotdb-client/client-py/table_model_session_example.py
index c9aa62b97a0..048f1022b76 100644
--- a/iotdb-client/client-py/table_model_session_example.py
+++ b/iotdb-client/client-py/table_model_session_example.py
@@ -158,5 +158,9 @@ with session.execute_query_statement("select * from table5 
order by time") as da
     df = dataset.todf()
     print(df)
 
+with session.execute_query_statement("select * from table5 order by time") as 
dataset:
+    while dataset.has_next_df():
+        print(dataset.next_df())
+
 # close session connection.
 session.close()
diff --git a/iotdb-client/client-py/tests/integration/test_dataframe.py 
b/iotdb-client/client-py/tests/integration/test_dataframe.py
index f314fbac184..61a243a23f4 100644
--- a/iotdb-client/client-py/tests/integration/test_dataframe.py
+++ b/iotdb-client/client-py/tests/integration/test_dataframe.py
@@ -42,6 +42,56 @@ def test_simple_query():
     assert_array_equal(df.values, [[123.0, 15.0]])
 
 
+def test_stream_query():
+    with IoTDBContainer("iotdb:dev") as db:
+        db: IoTDBContainer
+        session = Session(
+            db.get_container_host_ip(), db.get_exposed_port(6667), fetch_size=1
+        )
+        session.open(False)
+        session.execute_non_query_statement("CREATE DATABASE root.device0")
+
+        # Write data
+        session.insert_str_record("root.device0", 123, "pressure", "15.0")
+        session.insert_str_record("root.device0", 124, "pressure", "15.0")
+        session.insert_str_record("root.device0", 125, "pressure", "15.0")
+
+        # Read
+        session_data_set = session.execute_query_statement("SELECT * FROM 
root.device0")
+        index = 0
+        while session_data_set.has_next_df():
+            df = session_data_set.next_df()
+            assert list(df.columns) == ["Time", "root.device0.pressure"]
+            assert_array_equal(df.values, [[123.0 + index, 15.0]])
+            index += 1
+        session.close()
+        assert index == 3
+
+
+def test_stream_query_with_illegal_fetch_size():
+    with IoTDBContainer("iotdb:dev") as db:
+        db: IoTDBContainer
+        session = Session(
+            db.get_container_host_ip(), db.get_exposed_port(6667), 
fetch_size=-1
+        )
+        session.open(False)
+        session.execute_non_query_statement("CREATE DATABASE root.device0")
+
+        # Write data
+        session.insert_str_record("root.device0", 123, "pressure", "15.0")
+        session.insert_str_record("root.device0", 124, "pressure", "15.0")
+        session.insert_str_record("root.device0", 125, "pressure", "15.0")
+
+        # Read
+        session_data_set = session.execute_query_statement("SELECT * FROM 
root.device0")
+
+        while session_data_set.has_next_df():
+            df = session_data_set.next_df()
+            assert list(df.columns) == ["Time", "root.device0.pressure"]
+            assert_array_equal(df.values, [[123.0, 15.0], [124.0, 15.0], 
[125.0, 15.0]])
+        session.close()
+
+
 def test_non_time_query():
     with IoTDBContainer("iotdb:dev") as db:
         db: IoTDBContainer

Reply via email to