This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2e46872766b Support stream DataFrame interface in iotdb python client
(#17035)
2e46872766b is described below
commit 2e46872766be071c5fe583414899f5fe5ff87500
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jan 27 10:41:28 2026 +0800
Support stream DataFrame interface in iotdb python client (#17035)
* Support stream DataFrame interface in iotdb python client
* change according to code review
* format code
* Bug fix "optional"
* add IT
* fix illegal fetch_size
---------
Co-authored-by: Yongzao <[email protected]>
Co-authored-by: HTHou <[email protected]>
---
iotdb-client/client-py/iotdb/Session.py | 8 ++-
.../client-py/iotdb/utils/SessionDataSet.py | 19 ++++++
.../client-py/iotdb/utils/iotdb_rpc_dataset.py | 68 ++++++++++++++++++++++
iotdb-client/client-py/session_example.py | 6 ++
.../client-py/table_model_session_example.py | 4 ++
.../client-py/tests/integration/test_dataframe.py | 50 ++++++++++++++++
6 files changed, 154 insertions(+), 1 deletion(-)
diff --git a/iotdb-client/client-py/iotdb/Session.py
b/iotdb-client/client-py/iotdb/Session.py
index 3f44bd41584..a3d89d71746 100644
--- a/iotdb-client/client-py/iotdb/Session.py
+++ b/iotdb-client/client-py/iotdb/Session.py
@@ -95,7 +95,13 @@ class Session(object):
self.__default_endpoint = TEndPoint(self.__host, self.__port)
self.__user = user
self.__password = password
- self.__fetch_size = fetch_size
+ if fetch_size > 0:
+ self.__fetch_size = fetch_size
+ else:
+ logger.warning(
+ f"fetch_size {fetch_size} is illegal, use default fetch_size
{self.DEFAULT_FETCH_SIZE}"
+ )
+ self.__fetch_size = self.DEFAULT_FETCH_SIZE
self.__is_close = True
self.__client = None
self.__default_connection = None
diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
index b079ee28c1a..9c5b3ec06dd 100644
--- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
+++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
@@ -16,6 +16,7 @@
# under the License.
#
import logging
+from typing import Optional
from iotdb.utils.Field import Field
@@ -143,6 +144,24 @@ class SessionDataSet(object):
def close_operation_handle(self):
self.iotdb_rpc_data_set.close()
+ def has_next_df(self) -> bool:
+ """
+ Evaluate if there are more DataFrames to be fetched.
+ :return: whether there are more DataFrames to be fetched
+ """
+ # Check if buffer has data or if there are more results to fetch
+ rpc_ds = self.iotdb_rpc_data_set
+ return rpc_ds._has_buffered_data() or rpc_ds._has_next_result_set()
+
+ def next_df(self) -> Optional[pd.DataFrame]:
+ """
+ Get the next DataFrame from the result set.
+ Each returned DataFrame contains exactly fetch_size rows,
+ except for the last DataFrame which may contain fewer rows.
+ :return: the next DataFrame, or None if no more data
+ """
+ return self.iotdb_rpc_data_set.next_dataframe()
+
def todf(self) -> pd.DataFrame:
return result_set_to_pandas(self)
diff --git a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
index dc771368669..0edc76f68fd 100644
--- a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
+++ b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
@@ -18,6 +18,7 @@
# for package
import logging
+from typing import Optional
import numpy as np
import pandas as pd
@@ -120,10 +121,12 @@ class IoTDBRpcDataSet(object):
self.data_frame = None
self.__zone_id = zone_id
self.__time_precision = time_precision
+ self.__df_buffer = None # Buffer for streaming DataFrames
def close(self):
if self.__is_closed:
return
+ self.__df_buffer = None # Clean up streaming DataFrame buffer
if self.__client is not None:
try:
status = self.__client.closeOperation(
@@ -243,11 +246,73 @@ class IoTDBRpcDataSet(object):
return True
return False
+ def _has_buffered_data(self) -> bool:
+ """
+ Check if there is buffered data for streaming DataFrame interface.
+ :return: True if there is buffered data, False otherwise
+ """
+ return self.__df_buffer is not None and len(self.__df_buffer) > 0
+
+ def next_dataframe(self) -> Optional[pd.DataFrame]:
+ """
+ Get the next DataFrame from the result set with exactly fetch_size
rows.
+ The last DataFrame may have fewer rows.
+ :return: the next DataFrame with fetch_size rows, or None if no more
data
+ """
+ # Accumulate data until we have at least fetch_size rows or no more
data
+ while True:
+ buffer_len = 0 if self.__df_buffer is None else
len(self.__df_buffer)
+ if buffer_len >= self.__fetch_size:
+ # We have enough rows, return a chunk
+ break
+ if not self._has_next_result_set():
+ # No more data to fetch
+ break
+ # Process and accumulate
+ result = self._process_buffer()
+ new_df = self._build_dataframe(result)
+ if self.__df_buffer is None:
+ self.__df_buffer = new_df
+ else:
+ self.__df_buffer = pd.concat(
+ [self.__df_buffer, new_df], ignore_index=True
+ )
+
+ if self.__df_buffer is None or len(self.__df_buffer) == 0:
+ return None
+
+ if len(self.__df_buffer) <= self.__fetch_size:
+ # Return all remaining rows
+ result_df = self.__df_buffer
+ self.__df_buffer = None
+ return result_df
+ else:
+ # Slice off fetch_size rows
+ result_df = self.__df_buffer.iloc[: self.__fetch_size].reset_index(
+ drop=True
+ )
+ self.__df_buffer = self.__df_buffer.iloc[self.__fetch_size
:].reset_index(
+ drop=True
+ )
+ return result_df
+
def result_set_to_pandas(self):
result = {}
for i in range(len(self.__column_index_2_tsblock_column_index_list)):
result[i] = []
while self._has_next_result_set():
+ batch_result = self._process_buffer()
+ for k, v in batch_result.items():
+ result[k].extend(v)
+
+ return self._build_dataframe(result)
+
+ def _process_buffer(self):
+ result = {}
+ for i in range(len(self.__column_index_2_tsblock_column_index_list)):
+ result[i] = []
+
+ while self.__query_result_index < len(self.__query_result):
time_array, column_arrays, null_indicators, array_length =
deserialize(
memoryview(self.__query_result[self.__query_result_index])
)
@@ -339,6 +404,9 @@ class IoTDBRpcDataSet(object):
result[i].append(data_array)
+ return result
+
+ def _build_dataframe(self, result):
for k, v in result.items():
if v is None or len(v) < 1 or v[0] is None:
result[k] = []
diff --git a/iotdb-client/client-py/session_example.py
b/iotdb-client/client-py/session_example.py
index d0a6a3aba8e..996339c0088 100644
--- a/iotdb-client/client-py/session_example.py
+++ b/iotdb-client/client-py/session_example.py
@@ -411,6 +411,12 @@ with session.execute_query_statement(
df = dataset.todf()
print(df.to_string())
+with session.execute_query_statement(
+ "select s_01,s_02,s_03,s_04 from root.sg_test_01.d_04"
+) as dataset:
+ while dataset.has_next_df():
+ print(dataset.next_df())
+
# delete database
session.delete_storage_group("root.sg_test_01")
diff --git a/iotdb-client/client-py/table_model_session_example.py
b/iotdb-client/client-py/table_model_session_example.py
index c9aa62b97a0..048f1022b76 100644
--- a/iotdb-client/client-py/table_model_session_example.py
+++ b/iotdb-client/client-py/table_model_session_example.py
@@ -158,5 +158,9 @@ with session.execute_query_statement("select * from table5
order by time") as da
df = dataset.todf()
print(df)
+with session.execute_query_statement("select * from table5 order by time") as
dataset:
+ while dataset.has_next_df():
+ print(dataset.next_df())
+
# close session connection.
session.close()
diff --git a/iotdb-client/client-py/tests/integration/test_dataframe.py
b/iotdb-client/client-py/tests/integration/test_dataframe.py
index f314fbac184..61a243a23f4 100644
--- a/iotdb-client/client-py/tests/integration/test_dataframe.py
+++ b/iotdb-client/client-py/tests/integration/test_dataframe.py
@@ -42,6 +42,56 @@ def test_simple_query():
assert_array_equal(df.values, [[123.0, 15.0]])
+def test_stream_query():
+ with IoTDBContainer("iotdb:dev") as db:
+ db: IoTDBContainer
+ session = Session(
+ db.get_container_host_ip(), db.get_exposed_port(6667), fetch_size=1
+ )
+ session.open(False)
+ session.execute_non_query_statement("CREATE DATABASE root.device0")
+
+ # Write data
+ session.insert_str_record("root.device0", 123, "pressure", "15.0")
+ session.insert_str_record("root.device0", 124, "pressure", "15.0")
+ session.insert_str_record("root.device0", 125, "pressure", "15.0")
+
+ # Read
+ session_data_set = session.execute_query_statement("SELECT * FROM
root.device0")
+ index = 0
+ while session_data_set.has_next_df():
+ df = session_data_set.next_df()
+ assert list(df.columns) == ["Time", "root.device0.pressure"]
+ assert_array_equal(df.values, [[123.0 + index, 15.0]])
+ index += 1
+ session.close()
+ assert index == 3
+
+
+def test_stream_query_with_illegal_fetch_size():
+ with IoTDBContainer("iotdb:dev") as db:
+ db: IoTDBContainer
+ session = Session(
+ db.get_container_host_ip(), db.get_exposed_port(6667),
fetch_size=-1
+ )
+ session.open(False)
+ session.execute_non_query_statement("CREATE DATABASE root.device0")
+
+ # Write data
+ session.insert_str_record("root.device0", 123, "pressure", "15.0")
+ session.insert_str_record("root.device0", 124, "pressure", "15.0")
+ session.insert_str_record("root.device0", 125, "pressure", "15.0")
+
+ # Read
+ session_data_set = session.execute_query_statement("SELECT * FROM
root.device0")
+
+ while session_data_set.has_next_df():
+ df = session_data_set.next_df()
+ assert list(df.columns) == ["Time", "root.device0.pressure"]
+ assert_array_equal(df.values, [[123.0, 15.0], [124.0, 15.0],
[125.0, 15.0]])
+ session.close()
+
+
def test_non_time_query():
with IoTDBContainer("iotdb:dev") as db:
db: IoTDBContainer