This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch pyclient_timestamp_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 40acdf4f007efcc9df356e7ace9f5affafeada14 Author: HTHou <[email protected]> AuthorDate: Tue Apr 22 16:34:19 2025 +0800 [Py-client] Query Timestamp type of values return readable format --- iotdb-client/client-py/README.md | 4 ++-- iotdb-client/client-py/iotdb/Session.py | 18 ++++++++++++++-- iotdb-client/client-py/iotdb/SessionPool.py | 4 +++- iotdb-client/client-py/iotdb/utils/Field.py | 24 ++++++++++++++++++---- .../client-py/iotdb/utils/SessionDataSet.py | 21 +++++++++++++++---- .../client-py/iotdb/utils/iotdb_rpc_dataset.py | 21 +++++++++++++++---- iotdb-client/client-py/iotdb/utils/rpc_utils.py | 10 +++++++++ iotdb-client/client-py/requirements.txt | 1 + iotdb-client/client-py/resources/pyproject.toml | 3 ++- .../session_aligned_timeseries_example.py | 4 +++- iotdb-client/client-py/session_example.py | 4 ++-- iotdb-client/client-py/session_pool_example.py | 2 +- iotdb-client/client-py/session_ssl_example.py | 4 ++-- .../client-py/table_model_session_example.py | 4 ++-- .../integration/tablet_performance_comparison.py | 4 +++- 15 files changed, 101 insertions(+), 27 deletions(-) diff --git a/iotdb-client/client-py/README.md b/iotdb-client/client-py/README.md index cf87012e036..2cdf23b5e12 100644 --- a/iotdb-client/client-py/README.md +++ b/iotdb-client/client-py/README.md @@ -73,7 +73,7 @@ session.close() * Initialize a Session ```python -session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") +session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="Asia/Shanghai") ``` * Open a session, with a parameter to specify whether to enable RPC compression @@ -375,7 +375,7 @@ ip = "127.0.0.1" port_ = "6667" username_ = "root" password_ = "root" -conn = connect(ip, port_, username_, password_,fetch_size=1024,zone_id="UTC+8",sqlalchemy_mode=False) +conn = connect(ip, port_, username_, password_,fetch_size=1024,zone_id="Asia/Shanghai",sqlalchemy_mode=False) cursor = conn.cursor() ``` + simple SQL statement execution diff --git a/iotdb-client/client-py/iotdb/Session.py b/iotdb-client/client-py/iotdb/Session.py index 6bb9a5860f2..3f44bd41584 100644 --- a/iotdb-client/client-py/iotdb/Session.py +++ b/iotdb-client/client-py/iotdb/Session.py @@ -20,10 +20,10 @@ import logging import random import sys import struct -import time import warnings from thrift.protocol import TBinaryProtocol, TCompactProtocol from thrift.transport import TSocket, TTransport +from tzlocal import get_localzone_name from iotdb.utils.SessionDataSet import SessionDataSet from .template.Template import Template @@ -71,7 +71,7 @@ class Session(object): DEFAULT_FETCH_SIZE = 5000 DEFAULT_USER = "root" DEFAULT_PASSWORD = "root" - DEFAULT_ZONE_ID = time.strftime("%z") + DEFAULT_ZONE_ID = get_localzone_name() RETRY_NUM = 3 SQL_DIALECT = "tree" @@ -112,6 +112,7 @@ class Session(object): self.__use_ssl = use_ssl self.__ca_certs = ca_certs self.__connection_timeout_in_ms = connection_timeout_in_ms + self.__time_precision = "ms" @classmethod def init_from_node_urls( @@ -206,6 +207,11 @@ class Session(object): try: open_resp = client.openSession(open_req) rpc_utils.verify_success(open_resp.status) + if open_resp.configuration is not None: + if "timestamp_precision" in open_resp.configuration: + self.__time_precision = open_resp.configuration[ + "timestamp_precision" + ] if self.protocol_version != open_resp.serverProtocolVersion: logger.exception( @@ -1518,6 +1524,8 @@ class Session(object): timeout, resp.moreData, self.__fetch_size, + self.__zone_id, + self.__time_precision, resp.columnIndex2TsBlockColumnIndexList, ) @@ -1587,6 +1595,8 @@ class Session(object): timeout, resp.moreData, self.__fetch_size, + self.__zone_id, + self.__time_precision, resp.columnIndex2TsBlockColumnIndexList, ) else: @@ -1748,6 +1758,8 @@ class Session(object): 0, resp.moreData, self.__fetch_size, + self.__zone_id, + self.__time_precision, resp.columnIndex2TsBlockColumnIndexList, ) @@ -1793,6 +1805,8 @@ class Session(object): 0, resp.moreData, self.__fetch_size, + self.__zone_id, + self.__time_precision, resp.columnIndex2TsBlockColumnIndexList, ) diff --git a/iotdb-client/client-py/iotdb/SessionPool.py b/iotdb-client/client-py/iotdb/SessionPool.py index 3502cc24eac..1e34186b2e0 100644 --- a/iotdb-client/client-py/iotdb/SessionPool.py +++ b/iotdb-client/client-py/iotdb/SessionPool.py @@ -21,12 +21,14 @@ import time from queue import Queue from threading import Lock +from tzlocal import get_localzone_name + from iotdb.Session import Session DEFAULT_MULTIPIE = 5 DEFAULT_FETCH_SIZE = 5000 DEFAULT_MAX_RETRY = 3 -DEFAULT_TIME_ZONE = "UTC+8" +DEFAULT_TIME_ZONE = get_localzone_name() SQL_DIALECT = "tree" logger = logging.getLogger("IoTDB") diff --git a/iotdb-client/client-py/iotdb/utils/Field.py b/iotdb-client/client-py/iotdb/utils/Field.py index 2b9d7af0f70..b0622cafebe 100644 --- a/iotdb-client/client-py/iotdb/utils/Field.py +++ b/iotdb-client/client-py/iotdb/utils/Field.py @@ -24,12 +24,14 @@ import pandas as pd class Field(object): - def __init__(self, data_type, value=None): + def __init__(self, data_type, value=None, timezone=None, precision=None): """ :param data_type: TSDataType """ self.__data_type = data_type self.value = value + self.__timezone = timezone + self.__precision = precision @staticmethod def copy(field): @@ -157,6 +159,17 @@ class Field(object): return None return self.value + def get_timestamp_value(self): + if self.__data_type is None: + raise Exception("Null Field Exception!") + if ( + self.__data_type != TSDataType.TIMESTAMP + or self.value is None + or self.value is pd.NA + ): + return None + return pd.Timestamp(self.value, unit=self.__precision, tz=self.__timezone) + def get_date_value(self): if self.__data_type is None: raise Exception("Null Field Exception!") @@ -172,11 +185,12 @@ class Field(object): if self.__data_type is None or self.value is None or self.value is pd.NA: return "None" # TEXT, STRING - elif self.__data_type == 5 or self.__data_type == 11: + if self.__data_type == 5 or self.__data_type == 11: return self.value.decode("utf-8") # BLOB elif self.__data_type == 10: return str(hex(int.from_bytes(self.value, byteorder="big"))) + # Others else: return str(self.get_object_value(self.__data_type)) @@ -193,17 +207,19 @@ class Field(object): return bool(self.value) elif data_type == 1: return np.int32(self.value) - elif data_type == 2 or data_type == 8: + elif data_type == 2: return np.int64(self.value) elif data_type == 3: return np.float32(self.value) elif data_type == 4: return np.float64(self.value) + elif data_type == 8: + return pd.Timestamp(self.value, unit=self.__precision, tz=self.__timezone) elif data_type == 9: return parse_int_to_date(self.value) elif data_type == 5 or data_type == 11: return self.value.decode("utf-8") - elif data_type == 10: + elif data_type == 10 or data_type == 8: return self.value else: raise RuntimeError("Unsupported data type:" + str(data_type)) diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py index 6209d13e5f3..b75889e5a96 100644 --- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py +++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py @@ -17,6 +17,8 @@ # import logging +import numpy as np + from iotdb.utils.Field import Field # for package @@ -45,13 +47,14 @@ class SessionDataSet(object): time_out, more_data, fetch_size, + zone_id, + time_precision, column_index_2_tsblock_column_index_list, ): self.iotdb_rpc_data_set = IoTDBRpcDataSet( sql, column_name_list, column_type_list, - column_name_index, ignore_timestamp, more_data, query_id, @@ -61,16 +64,26 @@ class SessionDataSet(object): query_result, fetch_size, time_out, + zone_id, + time_precision, column_index_2_tsblock_column_index_list, ) if ignore_timestamp: self.__field_list = [ - Field(data_type) + ( + Field(data_type, timezone=zone_id, precision=time_precision) + if data_type == 8 + else Field(data_type) + ) for data_type in self.iotdb_rpc_data_set.get_column_types() ] else: self.__field_list = [ - Field(data_type) + ( + Field(data_type, timezone=zone_id, precision=time_precision) + if data_type == 8 + else Field(data_type) + ) for data_type in self.iotdb_rpc_data_set.get_column_types()[1:] ] self.row_index = 0 @@ -155,7 +168,7 @@ def get_typed_point(field: Field, none_value=None): TSDataType.INT32: lambda f: f.get_int_value(), TSDataType.DOUBLE: lambda f: f.get_double_value(), TSDataType.INT64: lambda f: f.get_long_value(), - TSDataType.TIMESTAMP: lambda f: f.get_long_value(), + TSDataType.TIMESTAMP: lambda f: f.get_timestamp_value(), TSDataType.STRING: lambda f: f.get_string_value(), TSDataType.DATE: lambda f: f.get_date_value(), TSDataType.BLOB: lambda f: f.get_binary_value(), diff --git a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py index 8530fef5c20..912ee41595f 100644 --- a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py +++ b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py @@ -41,7 +41,6 @@ class IoTDBRpcDataSet(object): sql, column_name_list, column_type_list, - column_name_index, ignore_timestamp, more_data, query_id, @@ -51,6 +50,8 @@ class IoTDBRpcDataSet(object): query_result, fetch_size, time_out, + zone_id, + time_precision, column_index_2_tsblock_column_index_list, ): self.__statement_id = statement_id @@ -117,6 +118,8 @@ class IoTDBRpcDataSet(object): self.__empty_resultSet = False self.has_cached_data_frame = False self.data_frame = None + self.__zone_id = zone_id + self.__time_precision = time_precision def close(self): if self.__is_closed: @@ -155,7 +158,7 @@ class IoTDBRpcDataSet(object): def construct_one_data_frame(self): if self.has_cached_data_frame or self.__query_result is None: - return True + return result = {} has_pd_series = [] for i in range(len(self.__column_index_2_tsblock_column_index_list)): @@ -264,8 +267,8 @@ class IoTDBRpcDataSet(object): continue data_type = self.__data_type_for_tsblock_column[location] column_array = column_arrays[location] - # BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TIMESTAMP, BLOB - if data_type in (0, 1, 2, 3, 4, 8, 10): + # BOOLEAN, INT32, INT64, FLOAT, DOUBLE, BLOB + if data_type in (0, 1, 2, 3, 4, 10): data_array = column_array if ( data_type != 10 @@ -278,6 +281,16 @@ class IoTDBRpcDataSet(object): # TEXT, STRING elif data_type in (5, 11): data_array = np.array([x.decode("utf-8") for x in column_array]) + # TIMESTAMP + elif data_type == 8: + data_array = pd.Series( + [ + pd.Timestamp( + x, unit=self.__time_precision, tz=self.__zone_id + ) + for x in column_array + ] + ) # DATE elif data_type == 9: data_array = pd.Series(column_array).apply(parse_int_to_date) diff --git a/iotdb-client/client-py/iotdb/utils/rpc_utils.py b/iotdb-client/client-py/iotdb/utils/rpc_utils.py index 6ceb39c6558..5023b0de459 100644 --- a/iotdb-client/client-py/iotdb/utils/rpc_utils.py +++ b/iotdb-client/client-py/iotdb/utils/rpc_utils.py @@ -15,6 +15,9 @@ # specific language governing permissions and limitations # under the License. # +import pandas as pd +from tzlocal import get_localzone_name + from iotdb.thrift.common.ttypes import TSStatus from iotdb.utils.exception import RedirectException, StatementExecutionException @@ -67,3 +70,10 @@ def verify_success_with_redirection_for_multi_devices(status: TSStatus, devices: if status.subStatus[i].redirectNode is not None: device_to_endpoint[devices[i]] = status.subStatus[i].redirectNode raise RedirectException(device_to_endpoint) + + +def convert_to_timestamp(time: int, precision: str, timezone: str): + try: + return pd.Timestamp(time, unit=precision, tz=timezone) + except ValueError: + return pd.Timestamp(time, unit=precision, tz=get_localzone_name()) diff --git a/iotdb-client/client-py/requirements.txt b/iotdb-client/client-py/requirements.txt index 0741cf6db67..980a7442a54 100644 --- a/iotdb-client/client-py/requirements.txt +++ b/iotdb-client/client-py/requirements.txt @@ -23,3 +23,4 @@ thrift>=0.14.1 # SQLAlchemy Dialect sqlalchemy>=1.4 sqlalchemy-utils>=0.37.8 +tzlocal>=4.0 diff --git a/iotdb-client/client-py/resources/pyproject.toml b/iotdb-client/client-py/resources/pyproject.toml index d8102161b43..2b32eefd59b 100644 --- a/iotdb-client/client-py/resources/pyproject.toml +++ b/iotdb-client/client-py/resources/pyproject.toml @@ -42,7 +42,8 @@ dependencies = [ "pandas>=1.0.0", "numpy>=1.0.0", "sqlalchemy>=1.4", - "sqlalchemy-utils>=0.37.8" + "sqlalchemy-utils>=0.37.8", + "tzlocal>=4.0" ] [project.urls] diff --git a/iotdb-client/client-py/session_aligned_timeseries_example.py b/iotdb-client/client-py/session_aligned_timeseries_example.py index 99b9f16a62e..450d69f2818 100644 --- a/iotdb-client/client-py/session_aligned_timeseries_example.py +++ b/iotdb-client/client-py/session_aligned_timeseries_example.py @@ -27,7 +27,9 @@ ip = "127.0.0.1" port_ = "6667" username_ = "root" password_ = "root" -session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") +session = Session( + ip, port_, username_, password_, fetch_size=1024, zone_id="Asia/Shanghai" +) session.open(False) # set and delete databases diff --git a/iotdb-client/client-py/session_example.py b/iotdb-client/client-py/session_example.py index ca610de9a0c..d0a6a3aba8e 100644 --- a/iotdb-client/client-py/session_example.py +++ b/iotdb-client/client-py/session_example.py @@ -30,13 +30,13 @@ ip = "127.0.0.1" port_ = "6667" username_ = "root" password_ = "root" -# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8", enable_redirection=True) +# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="Asia/Shanghai", enable_redirection=True) session = Session.init_from_node_urls( node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"], user="root", password="root", fetch_size=1024, - zone_id="UTC+8", + zone_id="Asia/Shanghai", enable_redirection=True, ) session.open(False) diff --git a/iotdb-client/client-py/session_pool_example.py b/iotdb-client/client-py/session_pool_example.py index 9230634ba6f..64a754087fa 100644 --- a/iotdb-client/client-py/session_pool_example.py +++ b/iotdb-client/client-py/session_pool_example.py @@ -120,7 +120,7 @@ pool_config = PoolConfig( user_name=username, password=password, fetch_size=1024, - time_zone="UTC+8", + time_zone="Asia/Shanghai", max_retry=3, ) max_pool_size = 5 diff --git a/iotdb-client/client-py/session_ssl_example.py b/iotdb-client/client-py/session_ssl_example.py index ea367e00f3a..2d5a557afc4 100644 --- a/iotdb-client/client-py/session_ssl_example.py +++ b/iotdb-client/client-py/session_ssl_example.py @@ -49,7 +49,7 @@ def get_data2(): user_name=username_, password=password_, fetch_size=1024, - time_zone="UTC+8", + time_zone="Asia/Shanghai", max_retry=3, use_ssl=use_ssl, ca_certs=ca_certs, @@ -71,7 +71,7 @@ def get_table_data(): username=username_, password=password_, fetch_size=1024, - time_zone="UTC+8", + time_zone="Asia/Shanghai", use_ssl=use_ssl, ca_certs=ca_certs, ) diff --git a/iotdb-client/client-py/table_model_session_example.py b/iotdb-client/client-py/table_model_session_example.py index 86aac09da2d..c9aa62b97a0 100644 --- a/iotdb-client/client-py/table_model_session_example.py +++ b/iotdb-client/client-py/table_model_session_example.py @@ -28,7 +28,7 @@ config = TableSessionConfig( node_urls=["localhost:6667"], username="root", password="root", - time_zone="UTC+8", + time_zone="Asia/Shanghai", ) session = TableSession(config) @@ -69,7 +69,7 @@ config = TableSessionConfig( username="root", password="root", database="test1", - time_zone="UTC+8", + time_zone="Asia/Shanghai", ) session = TableSession(config) diff --git a/iotdb-client/client-py/tests/integration/tablet_performance_comparison.py b/iotdb-client/client-py/tests/integration/tablet_performance_comparison.py index 3626e818a85..c22124ec009 100644 --- a/iotdb-client/client-py/tests/integration/tablet_performance_comparison.py +++ b/iotdb-client/client-py/tests/integration/tablet_performance_comparison.py @@ -113,7 +113,9 @@ def create_open_session(): port_ = "6667" username_ = "root" password_ = "root" - session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") + session = Session( + ip, port_, username_, password_, fetch_size=1024, zone_id="Asia/Shanghai" + ) session.open(False) return session
