This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f3b76555802 [Py-Client] Support table model with python client (#13443)
f3b76555802 is described below
commit f3b76555802d71cf1bce38899894b049ad28cfa5
Author: Haonan <[email protected]>
AuthorDate: Tue Sep 10 18:23:37 2024 +0800
[Py-Client] Support table model with python client (#13443)
* support query
* implement insert_relational_tablet
* session poll
* fix 3.6
* fix 3.6
* fix 3.6
* run tests
* fix tests
* fix tests
* fix tests
* fix tests
* fix tests
* fix tests
* fix tests
* fix review
* fix review
---
iotdb-client/client-py/TableModelSessionExample.py | 154 ++++++++++++++++++++
.../client-py/TableModelSessionPoolExample.py | 160 +++++++++++++++++++++
iotdb-client/client-py/iotdb/Session.py | 113 +++++++++++++--
iotdb-client/client-py/iotdb/SessionPool.py | 11 ++
iotdb-client/client-py/iotdb/utils/Field.py | 15 +-
.../client-py/iotdb/utils/IoTDBConstants.py | 6 +
.../client-py/iotdb/utils/IoTDBRpcDataSet.py | 16 +--
iotdb-client/client-py/iotdb/utils/NumpyTablet.py | 58 +++++---
.../client-py/iotdb/utils/SessionDataSet.py | 4 +-
iotdb-client/client-py/iotdb/utils/Tablet.py | 67 +++++++--
.../tests/integration/test_relational_session.py | 143 ++++++++++++++++++
11 files changed, 692 insertions(+), 55 deletions(-)
diff --git a/iotdb-client/client-py/TableModelSessionExample.py
b/iotdb-client/client-py/TableModelSessionExample.py
new file mode 100644
index 00000000000..3a31a463792
--- /dev/null
+++ b/iotdb-client/client-py/TableModelSessionExample.py
@@ -0,0 +1,154 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import numpy as np
+
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.NumpyTablet import NumpyTablet
+from iotdb.utils.Tablet import ColumnType, Tablet
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = "root"
+password_ = "root"
+
+# don't specify database in constructor
+session = Session(ip, port_, username_, password_, sql_dialect="table",
database="db1")
+session.open(False)
+
+session.execute_non_query_statement("CREATE DATABASE test1")
+session.execute_non_query_statement("CREATE DATABASE test2")
+session.execute_non_query_statement("use test2")
+
+# or use full qualified table name
+session.execute_non_query_statement(
+ "create table test1.table1("
+ "region_id STRING ID, plant_id STRING ID, device_id STRING ID, "
+ "model STRING ATTRIBUTE, temperature FLOAT MEASUREMENT, humidity DOUBLE
MEASUREMENT) with (TTL=3600000)"
+)
+session.execute_non_query_statement(
+ "create table table2("
+ "region_id STRING ID, plant_id STRING ID, color STRING ATTRIBUTE,
temperature FLOAT MEASUREMENT,"
+ " speed DOUBLE MEASUREMENT) with (TTL=6600000)"
+)
+
+# show tables from current database
+with session.execute_query_statement("SHOW TABLES") as session_data_set:
+ print(session_data_set.get_column_names())
+ while session_data_set.has_next():
+ print(session_data_set.next())
+
+# show tables by specifying another database
+# using SHOW tables FROM
+with session.execute_query_statement("SHOW TABLES FROM test1") as
session_data_set:
+ print(session_data_set.get_column_names())
+ while session_data_set.has_next():
+ print(session_data_set.next())
+
+session.close()
+
+# specify database in constructor
+session = Session(
+ ip, port_, username_, password_, sql_dialect="table", database="test1"
+)
+session.open(False)
+
+# show tables from current database
+with session.execute_query_statement("SHOW TABLES") as session_data_set:
+ print(session_data_set.get_column_names())
+ while session_data_set.has_next():
+ print(session_data_set.next())
+
+# change database to test2
+session.execute_non_query_statement("use test2")
+
+# show tables by specifying another database
+# using SHOW tables FROM
+with session.execute_query_statement("SHOW TABLES") as session_data_set:
+ print(session_data_set.get_column_names())
+ while session_data_set.has_next():
+ print(session_data_set.next())
+
+session.close()
+
+# insert tablet by insert_relational_tablet
+session = Session(ip, port_, username_, password_, sql_dialect="table")
+session.open(False)
+session.execute_non_query_statement("CREATE DATABASE IF NOT EXISTS db1")
+session.execute_non_query_statement('USE "db1"')
+session.execute_non_query_statement(
+ "CREATE TABLE table5 (id1 string id, attr1 string attribute, "
+ + "m1 double "
+ + "measurement)"
+)
+
+column_names = [
+ "id1",
+ "attr1",
+ "m1",
+]
+data_types = [
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.DOUBLE,
+]
+column_types = [ColumnType.ID, ColumnType.ATTRIBUTE, ColumnType.MEASUREMENT]
+timestamps = []
+values = []
+for row in range(15):
+ timestamps.append(row)
+ values.append(["id:" + str(row), "attr:" + str(row), row * 1.0])
+tablet = Tablet("table5", column_names, data_types, values, timestamps,
column_types)
+session.insert_relational_tablet(tablet)
+
+session.execute_non_query_statement("FLush")
+
+np_timestamps = np.arange(15, 30, dtype=np.dtype(">i8"))
+np_values = [
+ np.array(["id:{}".format(i) for i in range(15, 30)]),
+ np.array(["attr:{}".format(i) for i in range(15, 30)]),
+ np.linspace(15.0, 29.0, num=15, dtype=TSDataType.DOUBLE.np_dtype()),
+]
+
+np_tablet = NumpyTablet(
+ "table5",
+ column_names,
+ data_types,
+ np_values,
+ np_timestamps,
+ column_types=column_types,
+)
+session.insert_relational_tablet(np_tablet)
+
+with session.execute_query_statement("select * from table5 order by time") as
dataset:
+ print(dataset.get_column_names())
+ while dataset.has_next():
+ row_record = dataset.next()
+ # print(row_record.get_fields()[0].get_long_value())
+ # print(row_record.get_fields()[1].get_string_value())
+ # print(row_record.get_fields()[2].get_string_value())
+ # print(row_record.get_fields()[3].get_double_value())
+ print(row_record)
+
+with session.execute_query_statement("select * from table5 order by time") as
dataset:
+ df = dataset.todf()
+ print(df)
+
+# close session connection.
+session.close()
diff --git a/iotdb-client/client-py/TableModelSessionPoolExample.py
b/iotdb-client/client-py/TableModelSessionPoolExample.py
new file mode 100644
index 00000000000..44d6c353499
--- /dev/null
+++ b/iotdb-client/client-py/TableModelSessionPoolExample.py
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import threading
+
+import numpy as np
+
+from iotdb.SessionPool import PoolConfig, SessionPool
+from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.NumpyTablet import NumpyTablet
+from iotdb.utils.Tablet import ColumnType, Tablet
+
+
+def prepare_data():
+ print("create database")
+ # Get a session from the pool
+ session = session_pool.get_session()
+ session.execute_non_query_statement("CREATE DATABASE IF NOT EXISTS db1")
+ session.execute_non_query_statement('USE "db1"')
+ session.execute_non_query_statement(
+ "CREATE TABLE table0 (id1 string id, attr1 string attribute, "
+ + "m1 double "
+ + "measurement)"
+ )
+ session.execute_non_query_statement(
+ "CREATE TABLE table1 (id1 string id, attr1 string attribute, "
+ + "m1 double "
+ + "measurement)"
+ )
+
+ print("now the tables are:")
+ # show result
+ res = session.execute_query_statement("SHOW TABLES")
+ while res.has_next():
+ print(res.next())
+
+ session_pool.put_back(session)
+
+
+def insert_data(num: int):
+ print("insert data for table" + str(num))
+ # Get a session from the pool
+ session = session_pool.get_session()
+ column_names = [
+ "id1",
+ "attr1",
+ "m1",
+ ]
+ data_types = [
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.DOUBLE,
+ ]
+ column_types = [ColumnType.ID, ColumnType.ATTRIBUTE,
ColumnType.MEASUREMENT]
+ timestamps = []
+ values = []
+ for row in range(15):
+ timestamps.append(row)
+ values.append(["id:" + str(row), "attr:" + str(row), row * 1.0])
+ tablet = Tablet(
+ "table" + str(num), column_names, data_types, values, timestamps,
column_types
+ )
+ session.insert_relational_tablet(tablet)
+ session.execute_non_query_statement("FLush")
+
+ np_timestamps = np.arange(15, 30, dtype=np.dtype(">i8"))
+ np_values = [
+ np.array(["id:{}".format(i) for i in range(15, 30)]),
+ np.array(["attr:{}".format(i) for i in range(15, 30)]),
+ np.linspace(15.0, 29.0, num=15, dtype=TSDataType.DOUBLE.np_dtype()),
+ ]
+
+ np_tablet = NumpyTablet(
+ "table" + str(num),
+ column_names,
+ data_types,
+ np_values,
+ np_timestamps,
+ column_types=column_types,
+ )
+ session.insert_relational_tablet(np_tablet)
+ session_pool.put_back(session)
+
+
+def query_data():
+ # Get a session from the pool
+ session = session_pool.get_session()
+
+ print("get data from table0")
+ res = session.execute_query_statement("select * from table0")
+ while res.has_next():
+ print(res.next())
+
+ print("get data from table1")
+ res = session.execute_query_statement("select * from table0")
+ while res.has_next():
+ print(res.next())
+
+ session_pool.put_back(session)
+
+
+def delete_data():
+ session = session_pool.get_session()
+ session.execute_non_query_statement("drop database db1")
+ print("data has been deleted. now the databases are:")
+ res = session.execute_statement("show databases")
+ while res.has_next():
+ print(res.next())
+ session_pool.put_back(session)
+
+
+ip = "127.0.0.1"
+port = "6667"
+username = "root"
+password = "root"
+pool_config = PoolConfig(
+ node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
+ user_name=username,
+ password=password,
+ fetch_size=1024,
+ time_zone="UTC+8",
+ max_retry=3,
+ sql_dialect="table",
+ database="db1",
+)
+max_pool_size = 5
+wait_timeout_in_ms = 3000
+
+# Create a session pool
+session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms)
+
+prepare_data()
+
+insert_thread1 = threading.Thread(target=insert_data, args=(0,))
+insert_thread2 = threading.Thread(target=insert_data, args=(1,))
+
+insert_thread1.start()
+insert_thread2.start()
+
+insert_thread1.join()
+insert_thread2.join()
+
+query_data()
+delete_data()
+session_pool.close()
+print("example is finished!")
diff --git a/iotdb-client/client-py/iotdb/Session.py
b/iotdb-client/client-py/iotdb/Session.py
index 5aca06c2781..098faf0d520 100644
--- a/iotdb-client/client-py/iotdb/Session.py
+++ b/iotdb-client/client-py/iotdb/Session.py
@@ -73,6 +73,7 @@ class Session(object):
DEFAULT_PASSWORD = "root"
DEFAULT_ZONE_ID = time.strftime("%z")
RETRY_NUM = 3
+ SQL_DIALECT = "tree"
def __init__(
self,
@@ -83,6 +84,8 @@ class Session(object):
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
enable_redirection=True,
+ sql_dialect=SQL_DIALECT,
+ database=None,
):
self.__host = host
self.__port = port
@@ -103,6 +106,8 @@ class Session(object):
self.__enable_redirection = enable_redirection
self.__device_id_to_endpoint = None
self.__endpoint_to_connection = None
+ self.__sql_dialect = sql_dialect
+ self.__database = database
@classmethod
def init_from_node_urls(
@@ -113,11 +118,21 @@ class Session(object):
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
enable_redirection=True,
+ sql_dialect=None,
+ database=None,
):
if node_urls is None:
raise RuntimeError("node urls is empty")
session = Session(
- None, None, user, password, fetch_size, zone_id, enable_redirection
+ None,
+ None,
+ user,
+ password,
+ fetch_size,
+ zone_id,
+ enable_redirection,
+ sql_dialect=sql_dialect,
+ database=database,
)
session.__hosts = []
session.__ports = []
@@ -180,12 +195,15 @@ class Session(object):
else:
client =
Client(TBinaryProtocol.TBinaryProtocolAccelerated(transport))
+ configuration = {"version": "V_1_0", "sql_dialect": self.__sql_dialect}
+ if self.__database is not None:
+ configuration["db"] = self.__database
open_req = TSOpenSessionReq(
client_protocol=self.protocol_version,
username=self.__user,
password=self.__password,
zoneId=self.__zone_id,
- configuration={"version": "V_1_0"},
+ configuration=configuration,
)
try:
@@ -890,13 +908,15 @@ class Session(object):
"""
request = self.gen_insert_tablet_req(tablet)
try:
- connection = self.get_connection(tablet.get_device_id())
+ connection = self.get_connection(tablet.get_insert_target_name())
request.sessionId = connection.session_id
return Session.verify_success_with_redirection(
connection.client.insertTablet(request)
)
except RedirectException as e:
- return self.handle_redirection(tablet.get_device_id(),
e.redirect_node)
+ return self.handle_redirection(
+ tablet.get_insert_target_name(), e.redirect_node
+ )
except TTransport.TException as e:
if self.reconnect():
try:
@@ -915,14 +935,14 @@ class Session(object):
if self.__enable_redirection:
request_group = {}
for i in range(len(tablet_lst)):
- connection = self.get_connection(tablet_lst[i].get_device_id())
+ connection =
self.get_connection(tablet_lst[i].get_insert_target_name())
request = request_group.setdefault(
connection.client,
TSInsertTabletsReq(
connection.session_id, [], [], [], [], [], [], False
),
)
- request.prefixPaths.append(tablet_lst[i].get_device_id())
+
request.prefixPaths.append(tablet_lst[i].get_insert_target_name())
request.timestampsList.append(tablet_lst[i].get_binary_timestamps())
request.measurementsList.append(tablet_lst[i].get_measurements())
request.valuesList.append(tablet_lst[i].get_binary_values())
@@ -981,13 +1001,15 @@ class Session(object):
"""
request = self.gen_insert_tablet_req(tablet, True)
try:
- connection = self.get_connection(tablet.get_device_id())
+ connection = self.get_connection(tablet.get_insert_target_name())
request.sessionId = connection.session_id
return Session.verify_success_with_redirection(
connection.client.insertTablet(request)
)
except RedirectException as e:
- return self.handle_redirection(tablet.get_device_id(),
e.redirect_node)
+ return self.handle_redirection(
+ tablet.get_insert_target_name(), e.redirect_node
+ )
except TTransport.TException as e:
if self.reconnect():
try:
@@ -1006,14 +1028,14 @@ class Session(object):
if self.__enable_redirection:
request_group = {}
for i in range(len(tablet_lst)):
- connection = self.get_connection(tablet_lst[i].get_device_id())
+ connection =
self.get_connection(tablet_lst[i].get_insert_target_name())
request = request_group.setdefault(
connection.client,
TSInsertTabletsReq(
connection.session_id, [], [], [], [], [], [], True
),
)
- request.prefixPaths.append(tablet_lst[i].get_device_id())
+
request.prefixPaths.append(tablet_lst[i].get_insert_target_name())
request.timestampsList.append(tablet_lst[i].get_binary_timestamps())
request.measurementsList.append(tablet_lst[i].get_measurements())
request.valuesList.append(tablet_lst[i].get_binary_values())
@@ -1058,6 +1080,36 @@ class Session(object):
self.connection_error_msg()
) from None
+ def insert_relational_tablet(self, tablet):
+ """
+ insert one tablet, for example three column in the table1 can form a
tablet:
+ timestamps, id1, attr1, m1
+ 1, id:1, attr:1, 1.0
+ 2, id:1, attr:1, 2.0
+ 3, id:2, attr:2, 3.0
+ :param tablet: a tablet specified above
+ """
+ request = self.gen_insert_relational_tablet_req(tablet)
+ try:
+ connection = self.get_connection(tablet.get_insert_target_name())
+ request.sessionId = connection.session_id
+ return Session.verify_success_with_redirection(
+ connection.client.insertTablet(request)
+ )
+ except RedirectException as e:
+ return self.handle_redirection(
+ tablet.get_insert_target_name(), e.redirect_node
+ )
+ except TTransport.TException as e:
+ if self.reconnect():
+ try:
+ request.sessionId = self.__session_id
+ return
Session.verify_success(self.__client.insertTablet(request))
+ except TTransport.TException as e1:
+ raise IoTDBConnectionException(e1) from None
+ else:
+ raise IoTDBConnectionException(self.connection_error_msg())
from None
+
def insert_records_of_one_device(
self, device_id, times_list, measurements_list, types_list, values_list
):
@@ -1274,13 +1326,27 @@ class Session(object):
def gen_insert_tablet_req(self, tablet, is_aligned=False):
return TSInsertTabletReq(
self.__session_id,
- tablet.get_device_id(),
+ tablet.get_insert_target_name(),
+ tablet.get_measurements(),
+ tablet.get_binary_values(),
+ tablet.get_binary_timestamps(),
+ tablet.get_data_types(),
+ tablet.get_row_number(),
+ is_aligned,
+ )
+
+ def gen_insert_relational_tablet_req(self, tablet, is_aligned=False):
+ return TSInsertTabletReq(
+ self.__session_id,
+ tablet.get_insert_target_name(),
tablet.get_measurements(),
tablet.get_binary_values(),
tablet.get_binary_timestamps(),
tablet.get_data_types(),
tablet.get_row_number(),
is_aligned,
+ True,
+ tablet.get_column_categories(),
)
def gen_insert_tablets_req(self, tablet_lst, is_aligned=False):
@@ -1291,7 +1357,7 @@ class Session(object):
type_lst = []
size_lst = []
for tablet in tablet_lst:
- device_id_lst.append(tablet.get_device_id())
+ device_id_lst.append(tablet.get_insert_target_name())
measurements_lst.append(tablet.get_measurements())
values_lst.append(tablet.get_binary_values())
timestamps_lst.append(tablet.get_binary_timestamps())
@@ -1364,6 +1430,18 @@ class Session(object):
else:
raise IoTDBConnectionException(self.connection_error_msg())
from None
+ previous_db = self.__database
+ if resp.database is not None:
+ self.__database = resp.database
+ if previous_db != self.__database and self.__endpoint_to_connection is
not None:
+ iterator = iter(self.__endpoint_to_connection.items())
+ for entry in list(iterator):
+ endpoint, connection = entry
+ if connection != self.__default_connection:
+ try:
+ connection.change_database(sql)
+ except Exception as e:
+ self.__endpoint_to_connection.pop(endpoint)
return Session.verify_success(resp.status)
def execute_statement(self, sql: str, timeout=0):
@@ -2222,6 +2300,17 @@ class SessionConnection(object):
self.session_id = session_id
self.statement_id = statement_id
+ def change_database(self, sql):
+ try:
+ self.client.executeUpdateStatement(
+ TSExecuteStatementReq(self.session_id, sql, self.statement_id)
+ )
+ except TTransport.TException as e:
+ raise IoTDBConnectionException(
+ "failed to change database",
+ e,
+ ) from None
+
def close_connection(self, req):
try:
self.client.closeSession(req)
diff --git a/iotdb-client/client-py/iotdb/SessionPool.py
b/iotdb-client/client-py/iotdb/SessionPool.py
index 6f1d7580796..9585bfe3e10 100644
--- a/iotdb-client/client-py/iotdb/SessionPool.py
+++ b/iotdb-client/client-py/iotdb/SessionPool.py
@@ -27,6 +27,7 @@ DEFAULT_MULTIPIE = 5
DEFAULT_FETCH_SIZE = 5000
DEFAULT_MAX_RETRY = 3
DEFAULT_TIME_ZONE = "UTC+8"
+SQL_DIALECT = "tree"
logger = logging.getLogger("IoTDB")
@@ -42,6 +43,8 @@ class PoolConfig(object):
time_zone: str = DEFAULT_TIME_ZONE,
max_retry: int = DEFAULT_MAX_RETRY,
enable_compression: bool = False,
+ sql_dialect: str = SQL_DIALECT,
+ database: str = None,
):
self.host = host
self.port = port
@@ -58,6 +61,8 @@ class PoolConfig(object):
self.time_zone = time_zone
self.max_retry = max_retry
self.enable_compression = enable_compression
+ self.sql_dialect = sql_dialect
+ self.database = database
class SessionPool(object):
@@ -80,6 +85,9 @@ class SessionPool(object):
self.__pool_config.password,
self.__pool_config.fetch_size,
self.__pool_config.time_zone,
+ enable_redirection=True,
+ sql_dialect=self.__pool_config.sql_dialect,
+ database=self.__pool_config.database,
)
else:
@@ -90,6 +98,9 @@ class SessionPool(object):
self.__pool_config.password,
self.__pool_config.fetch_size,
self.__pool_config.time_zone,
+ enable_redirection=True,
+ sql_dialect=self.__pool_config.sql_dialect,
+ database=self.__pool_config.database,
)
session.open(self.__pool_config.enable_compression)
diff --git a/iotdb-client/client-py/iotdb/utils/Field.py
b/iotdb-client/client-py/iotdb/utils/Field.py
index 913c7594bf5..7b4b05eab72 100644
--- a/iotdb-client/client-py/iotdb/utils/Field.py
+++ b/iotdb-client/client-py/iotdb/utils/Field.py
@@ -38,13 +38,19 @@ class Field(object):
output.set_bool_value(field.get_bool_value())
elif output.get_data_type() == TSDataType.INT32:
output.set_int_value(field.get_int_value())
- elif output.get_data_type() == TSDataType.INT64:
+ elif (
+ output.get_data_type() == TSDataType.INT64
+ or output.get_data_type() == TSDataType.TIMESTAMP
+ ):
output.set_long_value(field.get_long_value())
elif output.get_data_type() == TSDataType.FLOAT:
output.set_float_value(field.get_float_value())
elif output.get_data_type() == TSDataType.DOUBLE:
output.set_double_value(field.get_double_value())
- elif output.get_data_type() == TSDataType.TEXT:
+ elif (
+ output.get_data_type() == TSDataType.TEXT
+ or output.get_data_type() == TSDataType.STRING
+ ):
output.set_binary_value(field.get_binary_value())
else:
raise Exception(
@@ -94,6 +100,7 @@ class Field(object):
raise Exception("Null Field Exception!")
if (
self.__data_type != TSDataType.INT64
+ and self.__data_type != TSDataType.TIMESTAMP
or self.value is None
or self.value is pd.NA
):
@@ -136,6 +143,8 @@ class Field(object):
raise Exception("Null Field Exception!")
if (
self.__data_type != TSDataType.TEXT
+ and self.__data_type != TSDataType.STRING
+ and self.__data_type != TSDataType.BLOB
or self.value is None
or self.value is pd.NA
):
@@ -145,7 +154,7 @@ class Field(object):
def get_string_value(self):
if self.__data_type is None or self.value is None or self.value is
pd.NA:
return "None"
- elif self.__data_type == 5:
+ elif self.__data_type == 5 or self.__data_type == 11:
return self.value.decode("utf-8")
else:
return str(self.get_object_value(self.__data_type))
diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py
b/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py
index 9b671663ff4..daae37d23ba 100644
--- a/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py
+++ b/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py
@@ -27,6 +27,10 @@ class TSDataType(IntEnum):
FLOAT = 3
DOUBLE = 4
TEXT = 5
+ TIMESTAMP = 8
+ DATE = 9
+ BLOB = 10
+ STRING = 11
def np_dtype(self):
return {
@@ -36,6 +40,8 @@ class TSDataType(IntEnum):
TSDataType.INT32: np.dtype(">i4"),
TSDataType.INT64: np.dtype(">i8"),
TSDataType.TEXT: np.dtype("str"),
+ TSDataType.TIMESTAMP: np.dtype(">i8"),
+ TSDataType.STRING: np.dtype("str"),
}[self]
diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
index 3151b2acfd8..2aa885125d8 100644
--- a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
+++ b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
@@ -182,11 +182,11 @@ class IoTDBRpcDataSet(object):
data_array = np.frombuffer(
value_buffer, np.dtype(np.int32).newbyteorder(">")
)
- elif data_type == 2:
+ elif data_type == 2 or data_type == 8:
data_array = np.frombuffer(
value_buffer, np.dtype(np.int64).newbyteorder(">")
)
- elif data_type == 5:
+ elif data_type == 5 or data_type == 11:
j = 0
offset = 0
data_array = []
@@ -225,7 +225,7 @@ class IoTDBRpcDataSet(object):
if data_type == 1:
tmp_array = pd.Series(tmp_array, dtype="Int32")
- elif data_type == 2:
+ elif data_type == 2 or data_type == 8:
tmp_array = pd.Series(tmp_array, dtype="Int64")
elif data_type == 0:
tmp_array = pd.Series(tmp_array, dtype="boolean")
@@ -297,11 +297,11 @@ class IoTDBRpcDataSet(object):
data_array = np.frombuffer(
value_buffer, np.dtype(np.int32).newbyteorder(">")
)
- elif data_type == 2:
+ elif data_type == 2 or data_type == 8:
data_array = np.frombuffer(
value_buffer, np.dtype(np.int64).newbyteorder(">")
)
- elif data_type == 5:
+ elif data_type == 5 or data_type == 11:
j = 0
offset = 0
data_array = []
@@ -327,13 +327,13 @@ class IoTDBRpcDataSet(object):
self.__query_data_set.valueList[location] = None
tmp_array = []
if len(data_array) < total_length:
- if data_type == 1 or data_type == 2:
+ if data_type == 1 or data_type == 2 or data_type == 8:
tmp_array = np.full(total_length, np.nan, np.float32)
elif data_type == 3 or data_type == 4:
tmp_array = np.full(total_length, np.nan,
data_array.dtype)
elif data_type == 0:
tmp_array = np.full(total_length, np.nan, np.float32)
- elif data_type == 5:
+ elif data_type == 5 or data_type == 11 or data_type == 10:
tmp_array = np.full(total_length, None,
dtype=data_array.dtype)
bitmap_buffer = self.__query_data_set.bitmapList[location]
@@ -345,7 +345,7 @@ class IoTDBRpcDataSet(object):
if data_type == 1:
tmp_array = pd.Series(tmp_array).astype("Int32")
- elif data_type == 2:
+ elif data_type == 2 or data_type == 8:
tmp_array = pd.Series(tmp_array).astype("Int64")
elif data_type == 0:
tmp_array = pd.Series(tmp_array).astype("boolean")
diff --git a/iotdb-client/client-py/iotdb/utils/NumpyTablet.py
b/iotdb-client/client-py/iotdb/utils/NumpyTablet.py
index 4577f7f880c..f00b61de519 100644
--- a/iotdb-client/client-py/iotdb/utils/NumpyTablet.py
+++ b/iotdb-client/client-py/iotdb/utils/NumpyTablet.py
@@ -17,28 +17,45 @@
#
import struct
+
+from numpy import ndarray
+from typing import List
from iotdb.utils.IoTDBConstants import TSDataType
from iotdb.utils.BitMap import BitMap
+from iotdb.utils.Tablet import ColumnType
class NumpyTablet(object):
def __init__(
- self, device_id, measurements, data_types, values, timestamps,
bitmaps=None
+ self,
+ insert_target_name: str,
+ column_names: List[str],
+ data_types: List[TSDataType],
+ values: List[ndarray],
+ timestamps: ndarray,
+ bitmaps: List[BitMap] = None,
+ column_types: List[ColumnType] = None,
):
"""
creating a numpy tablet for insertion
- for example, considering device: root.sg1.d1
+ for example using tree-model, considering device: root.sg1.d1
timestamps, m1, m2, m3
1, 125.3, True, text1
2, 111.6, False, text2
3, 688.6, True, text3
- Notice: From 0.13.0, the tablet can contain empty cell
- The tablet will be sorted at the initialization by timestamps
- :param device_id: String, IoTDB time series path to device layer
(without sensor)
- :param measurements: List, sensors
- :param data_types: TSDataType List, specify value types for sensors
- :param values: List of numpy array, the values of each column should
be the inner numpy array
- :param timestamps: Numpy array, the timestamps
+ for example using table-model, considering table: table1
+ timestamps, id1, attr1, m1
+ 1, id:1, attr:1, 1.0
+ 2, id:1, attr:1, 2.0
+ 3, id:2, attr:2, 3.0
+ Notice: The tablet will be sorted at the initialization by timestamps
+ :param insert_target_name: Str, DeviceId if using tree-view interfaces
or TableName when using table-view interfaces.
+ :param column_names: Str List, names of columns
+ :param data_types: TSDataType List, specify value types for columns
+ :param values: ndarray List, one ndarray contains the value of one
column
+ :param timestamps: ndarray, contains the timestamps
+ :param bitmaps: BitMap list, one bitmap records the position of none
value in a column
+ :param column_types: ColumnType List, marking the type of each column,
can be none for tree-view interfaces.
"""
if len(values) > 0 and len(values[0]) != len(timestamps):
raise RuntimeError(
@@ -63,12 +80,18 @@ class NumpyTablet(object):
self.__values = values
self.__timestamps = timestamps
- self.__device_id = device_id
- self.__measurements = measurements
+ self.__insert_target_name = insert_target_name
+ self.__measurements = column_names
self.__data_types = data_types
self.__row_number = len(timestamps)
- self.__column_number = len(measurements)
+ self.__column_number = len(column_names)
self.bitmaps = bitmaps
+ if column_types is None:
+ self.__column_types = ColumnType.n_copy(
+ ColumnType.MEASUREMENT, self.__column_number
+ )
+ else:
+ self.__column_types = column_types
@staticmethod
def check_sorted(timestamps):
@@ -83,11 +106,14 @@ class NumpyTablet(object):
def get_data_types(self):
return self.__data_types
+ def get_column_categories(self):
+ return self.__column_types
+
def get_row_number(self):
return self.__row_number
- def get_device_id(self):
- return self.__device_id
+ def get_insert_target_name(self):
+ return self.__insert_target_name
def get_timestamps(self):
return self.__timestamps
@@ -102,8 +128,8 @@ class NumpyTablet(object):
bs_len = 0
bs_list = []
for data_type, value in zip(self.__data_types, self.__values):
- # TEXT
- if data_type == 5:
+ # TEXT, STRING, BLOB
+ if data_type == 5 or data_type == 11 or data_type == 10:
format_str_list = [">"]
values_tobe_packed = []
for str_list in value:
diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
index a7ce7dfc40c..c69aca3ebbe 100644
--- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
+++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
@@ -124,7 +124,7 @@ class SessionDataSet(object):
def close_operation_handle(self):
self.iotdb_rpc_data_set.close()
- def todf(self):
+ def todf(self) -> pd.DataFrame:
return resultset_to_pandas(self)
@@ -147,6 +147,8 @@ def get_typed_point(field: Field, none_value=None):
TSDataType.INT32: lambda field: field.get_int_value(),
TSDataType.DOUBLE: lambda field: field.get_double_value(),
TSDataType.INT64: lambda field: field.get_long_value(),
+ TSDataType.TIMESTAMP: lambda field: field.get_long_value(),
+ TSDataType.STRING: lambda field: field.get_string_value(),
}
result_next_type: TSDataType = field.get_data_type()
diff --git a/iotdb-client/client-py/iotdb/utils/Tablet.py
b/iotdb-client/client-py/iotdb/utils/Tablet.py
index 508361dc857..0f086530347 100644
--- a/iotdb-client/client-py/iotdb/utils/Tablet.py
+++ b/iotdb-client/client-py/iotdb/utils/Tablet.py
@@ -17,26 +17,54 @@
#
import struct
-
+from enum import unique, IntEnum
+from typing import List
from iotdb.utils.BitMap import BitMap
+from iotdb.utils.IoTDBConstants import TSDataType
+
+
+@unique
+class ColumnType(IntEnum):
+ ID = 0
+ MEASUREMENT = 1
+ ATTRIBUTE = 2
+
+ def n_copy(self, n):
+ result = []
+ for i in range(n):
+ result.append(self)
+ return result
class Tablet(object):
- def __init__(self, device_id, measurements, data_types, values,
timestamps):
+ def __init__(
+ self,
+ insert_target_name: str,
+ column_names: List[str],
+ data_types: List[TSDataType],
+ values: List[List],
+ timestamps: List[int],
+ column_types: List[ColumnType] = None,
+ ):
"""
creating a tablet for insertion
- for example, considering device: root.sg1.d1
+ for example using tree-model, considering device: root.sg1.d1
timestamps, m1, m2, m3
1, 125.3, True, text1
2, 111.6, False, text2
3, 688.6, True, text3
- Notice: From 0.13.0, the tablet can contain empty cell
- The tablet will be sorted at the initialization by timestamps
- :param device_id: String, IoTDB time series path to device layer
(without sensor)
- :param measurements: List, sensors
- :param data_types: TSDataType List, specify value types for sensors
+ for example using table-model, considering table: table1
+ timestamps, id1, attr1, m1
+ 1, id:1, attr:1, 1.0
+ 2, id:1, attr:1, 2.0
+ 3, id:2, attr:2, 3.0
+ Notice: The tablet will be sorted at the initialization by timestamps
+ :param insert_target_name: Str, DeviceId if using tree-view interfaces
or TableName when using table-view interfaces.
+ :param column_names: Str List, names of columns
+ :param data_types: TSDataType List, specify value types for columns
:param values: 2-D List, the values of each row should be the outer
list element
- :param timestamps: List,
+ :param timestamps: int List, contains the timestamps
+ :param column_types: ColumnType List, marking the type of each column,
can be none for tree-view interfaces.
"""
if len(timestamps) != len(values):
raise RuntimeError(
@@ -51,11 +79,17 @@ class Tablet(object):
self.__values = values
self.__timestamps = timestamps
- self.__device_id = device_id
- self.__measurements = measurements
+ self.__insert_target_name = insert_target_name
+ self.__measurements = column_names
self.__data_types = data_types
self.__row_number = len(timestamps)
- self.__column_number = len(measurements)
+ self.__column_number = len(column_names)
+ if column_types is None:
+ self.__column_types = ColumnType.n_copy(
+ ColumnType.MEASUREMENT, self.__column_number
+ )
+ else:
+ self.__column_types = column_types
@staticmethod
def check_sorted(timestamps):
@@ -70,11 +104,14 @@ class Tablet(object):
def get_data_types(self):
return self.__data_types
+ def get_column_categories(self):
+ return self.__column_types
+
def get_row_number(self):
return self.__row_number
- def get_device_id(self):
- return self.__device_id
+ def get_insert_target_name(self):
+ return self.__insert_target_name
def get_binary_timestamps(self):
format_str_list = [">"]
@@ -150,7 +187,7 @@ class Tablet(object):
self.__mark_none_value(bitmaps, i, j)
has_none = True
- elif data_type_value == 5:
+ elif data_type_value == 5 or data_type_value == 11:
for j in range(self.__row_number):
if self.__values[j][i] is not None:
if isinstance(self.__values[j][i], str):
diff --git
a/iotdb-client/client-py/tests/integration/test_relational_session.py
b/iotdb-client/client-py/tests/integration/test_relational_session.py
new file mode 100644
index 00000000000..cb5382ddb49
--- /dev/null
+++ b/iotdb-client/client-py/tests/integration/test_relational_session.py
@@ -0,0 +1,143 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import numpy as np
+
+from iotdb.Session import Session
+from iotdb.SessionPool import PoolConfig, create_session_pool
+from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.NumpyTablet import NumpyTablet
+from iotdb.utils.Tablet import Tablet, ColumnType
+from iotdb.IoTDBContainer import IoTDBContainer
+
+
+def test_session():
+ session_test()
+
+
+def test_session_pool():
+ session_test(True)
+
+
+def session_test(use_session_pool=False):
+ with IoTDBContainer("iotdb:dev") as db:
+ db: IoTDBContainer
+
+ if use_session_pool:
+ pool_config = PoolConfig(
+ db.get_container_host_ip(),
+ db.get_exposed_port(6667),
+ "root",
+ "root",
+ None,
+ 1024,
+ "Asia/Shanghai",
+ 3,
+ sql_dialect="table",
+ )
+ session_pool = create_session_pool(pool_config, 1, 3000)
+ session = session_pool.get_session()
+ else:
+ session = Session(
+ db.get_container_host_ip(),
+ db.get_exposed_port(6667),
+ sql_dialect="table",
+ )
+ session.open(False)
+
+ if not session.is_open():
+ print("can't open session")
+ exit(1)
+
+ session.execute_non_query_statement("CREATE DATABASE IF NOT EXISTS
db1")
+ session.execute_non_query_statement('USE "db1"')
+ session.execute_non_query_statement(
+ "CREATE TABLE table5 (id1 string id, attr1 string attribute, "
+ + "m1 double "
+ + "measurement)"
+ )
+
+ column_names = [
+ "id1",
+ "attr1",
+ "m1",
+ ]
+ data_types = [
+ TSDataType.STRING,
+ TSDataType.STRING,
+ TSDataType.DOUBLE,
+ ]
+ column_types = [ColumnType.ID, ColumnType.ATTRIBUTE,
ColumnType.MEASUREMENT]
+ timestamps = []
+ values = []
+ for row in range(15):
+ timestamps.append(row)
+ values.append(["id:" + str(row), "attr:" + str(row), row * 1.0])
+ tablet = Tablet(
+ "table5", column_names, data_types, values, timestamps,
column_types
+ )
+ session.insert_relational_tablet(tablet)
+
+ session.execute_non_query_statement("FLush")
+
+ np_timestamps = np.arange(15, 30, dtype=np.dtype(">i8"))
+ np_values = [
+ np.array(["id:{}".format(i) for i in range(15, 30)]),
+ np.array(["attr:{}".format(i) for i in range(15, 30)]),
+ np.linspace(15.0, 29.0, num=15,
dtype=TSDataType.DOUBLE.np_dtype()),
+ ]
+
+ np_tablet = NumpyTablet(
+ "table5",
+ column_names,
+ data_types,
+ np_values,
+ np_timestamps,
+ column_types=column_types,
+ )
+ session.insert_relational_tablet(np_tablet)
+
+ with session.execute_query_statement(
+ "select * from table5 order by time"
+ ) as dataset:
+ cnt = 0
+ while dataset.has_next():
+ row_record = dataset.next()
+ timestamp = row_record.get_fields()[0].get_long_value()
+ assert (
+ "id:" + str(timestamp)
+ == row_record.get_fields()[1].get_string_value()
+ )
+ assert (
+ "attr:" + str(timestamp)
+ == row_record.get_fields()[2].get_string_value()
+ )
+ assert timestamp * 1.0 ==
row_record.get_fields()[3].get_double_value()
+ cnt += 1
+ assert 30 == cnt
+
+ with session.execute_query_statement(
+ "select * from table5 order by time"
+ ) as dataset:
+ df = dataset.todf()
+ rows, columns = df.shape
+ assert rows == 30
+ assert columns == 4
+
+ # close session connection.
+ session.close()