This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f3b76555802 [Py-Client] Support table model with python client (#13443)
f3b76555802 is described below

commit f3b76555802d71cf1bce38899894b049ad28cfa5
Author: Haonan <[email protected]>
AuthorDate: Tue Sep 10 18:23:37 2024 +0800

    [Py-Client] Support table model with python client (#13443)
    
    * support query
    
    * implement insert_relational_tablet
    
    * session poll
    
    * fix 3.6
    
    * fix 3.6
    
    * fix 3.6
    
    * run tests
    
    * fix tests
    
    * fix tests
    
    * fix tests
    
    * fix tests
    
    * fix tests
    
    * fix tests
    
    * fix tests
    
    * fix review
    
    * fix review
---
 iotdb-client/client-py/TableModelSessionExample.py | 154 ++++++++++++++++++++
 .../client-py/TableModelSessionPoolExample.py      | 160 +++++++++++++++++++++
 iotdb-client/client-py/iotdb/Session.py            | 113 +++++++++++++--
 iotdb-client/client-py/iotdb/SessionPool.py        |  11 ++
 iotdb-client/client-py/iotdb/utils/Field.py        |  15 +-
 .../client-py/iotdb/utils/IoTDBConstants.py        |   6 +
 .../client-py/iotdb/utils/IoTDBRpcDataSet.py       |  16 +--
 iotdb-client/client-py/iotdb/utils/NumpyTablet.py  |  58 +++++---
 .../client-py/iotdb/utils/SessionDataSet.py        |   4 +-
 iotdb-client/client-py/iotdb/utils/Tablet.py       |  67 +++++++--
 .../tests/integration/test_relational_session.py   | 143 ++++++++++++++++++
 11 files changed, 692 insertions(+), 55 deletions(-)

diff --git a/iotdb-client/client-py/TableModelSessionExample.py 
b/iotdb-client/client-py/TableModelSessionExample.py
new file mode 100644
index 00000000000..3a31a463792
--- /dev/null
+++ b/iotdb-client/client-py/TableModelSessionExample.py
@@ -0,0 +1,154 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import numpy as np
+
+from iotdb.Session import Session
+from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.NumpyTablet import NumpyTablet
+from iotdb.utils.Tablet import ColumnType, Tablet
+
+# creating session connection.
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = "root"
+password_ = "root"
+
+# don't specify database in constructor
+session = Session(ip, port_, username_, password_, sql_dialect="table", 
database="db1")
+session.open(False)
+
+session.execute_non_query_statement("CREATE DATABASE test1")
+session.execute_non_query_statement("CREATE DATABASE test2")
+session.execute_non_query_statement("use test2")
+
+# or use full qualified table name
+session.execute_non_query_statement(
+    "create table test1.table1("
+    "region_id STRING ID, plant_id STRING ID, device_id STRING ID, "
+    "model STRING ATTRIBUTE, temperature FLOAT MEASUREMENT, humidity DOUBLE 
MEASUREMENT) with (TTL=3600000)"
+)
+session.execute_non_query_statement(
+    "create table table2("
+    "region_id STRING ID, plant_id STRING ID, color STRING ATTRIBUTE, 
temperature FLOAT MEASUREMENT,"
+    " speed DOUBLE MEASUREMENT) with (TTL=6600000)"
+)
+
+# show tables from current database
+with session.execute_query_statement("SHOW TABLES") as session_data_set:
+    print(session_data_set.get_column_names())
+    while session_data_set.has_next():
+        print(session_data_set.next())
+
+# show tables by specifying another database
+# using SHOW tables FROM
+with session.execute_query_statement("SHOW TABLES FROM test1") as 
session_data_set:
+    print(session_data_set.get_column_names())
+    while session_data_set.has_next():
+        print(session_data_set.next())
+
+session.close()
+
+# specify database in constructor
+session = Session(
+    ip, port_, username_, password_, sql_dialect="table", database="test1"
+)
+session.open(False)
+
+# show tables from current database
+with session.execute_query_statement("SHOW TABLES") as session_data_set:
+    print(session_data_set.get_column_names())
+    while session_data_set.has_next():
+        print(session_data_set.next())
+
+# change database to test2
+session.execute_non_query_statement("use test2")
+
+# show tables by specifying another database
+# using SHOW tables FROM
+with session.execute_query_statement("SHOW TABLES") as session_data_set:
+    print(session_data_set.get_column_names())
+    while session_data_set.has_next():
+        print(session_data_set.next())
+
+session.close()
+
+# insert tablet by insert_relational_tablet
+session = Session(ip, port_, username_, password_, sql_dialect="table")
+session.open(False)
+session.execute_non_query_statement("CREATE DATABASE IF NOT EXISTS db1")
+session.execute_non_query_statement('USE "db1"')
+session.execute_non_query_statement(
+    "CREATE TABLE table5 (id1 string id, attr1 string attribute, "
+    + "m1 double "
+    + "measurement)"
+)
+
+column_names = [
+    "id1",
+    "attr1",
+    "m1",
+]
+data_types = [
+    TSDataType.STRING,
+    TSDataType.STRING,
+    TSDataType.DOUBLE,
+]
+column_types = [ColumnType.ID, ColumnType.ATTRIBUTE, ColumnType.MEASUREMENT]
+timestamps = []
+values = []
+for row in range(15):
+    timestamps.append(row)
+    values.append(["id:" + str(row), "attr:" + str(row), row * 1.0])
+tablet = Tablet("table5", column_names, data_types, values, timestamps, 
column_types)
+session.insert_relational_tablet(tablet)
+
+session.execute_non_query_statement("FLush")
+
+np_timestamps = np.arange(15, 30, dtype=np.dtype(">i8"))
+np_values = [
+    np.array(["id:{}".format(i) for i in range(15, 30)]),
+    np.array(["attr:{}".format(i) for i in range(15, 30)]),
+    np.linspace(15.0, 29.0, num=15, dtype=TSDataType.DOUBLE.np_dtype()),
+]
+
+np_tablet = NumpyTablet(
+    "table5",
+    column_names,
+    data_types,
+    np_values,
+    np_timestamps,
+    column_types=column_types,
+)
+session.insert_relational_tablet(np_tablet)
+
+with session.execute_query_statement("select * from table5 order by time") as 
dataset:
+    print(dataset.get_column_names())
+    while dataset.has_next():
+        row_record = dataset.next()
+        # print(row_record.get_fields()[0].get_long_value())
+        # print(row_record.get_fields()[1].get_string_value())
+        # print(row_record.get_fields()[2].get_string_value())
+        # print(row_record.get_fields()[3].get_double_value())
+        print(row_record)
+
+with session.execute_query_statement("select * from table5 order by time") as 
dataset:
+    df = dataset.todf()
+    print(df)
+
+# close session connection.
+session.close()
diff --git a/iotdb-client/client-py/TableModelSessionPoolExample.py 
b/iotdb-client/client-py/TableModelSessionPoolExample.py
new file mode 100644
index 00000000000..44d6c353499
--- /dev/null
+++ b/iotdb-client/client-py/TableModelSessionPoolExample.py
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import threading
+
+import numpy as np
+
+from iotdb.SessionPool import PoolConfig, SessionPool
+from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.NumpyTablet import NumpyTablet
+from iotdb.utils.Tablet import ColumnType, Tablet
+
+
+def prepare_data():
+    print("create database")
+    # Get a session from the pool
+    session = session_pool.get_session()
+    session.execute_non_query_statement("CREATE DATABASE IF NOT EXISTS db1")
+    session.execute_non_query_statement('USE "db1"')
+    session.execute_non_query_statement(
+        "CREATE TABLE table0 (id1 string id, attr1 string attribute, "
+        + "m1 double "
+        + "measurement)"
+    )
+    session.execute_non_query_statement(
+        "CREATE TABLE table1 (id1 string id, attr1 string attribute, "
+        + "m1 double "
+        + "measurement)"
+    )
+
+    print("now the tables are:")
+    # show result
+    res = session.execute_query_statement("SHOW TABLES")
+    while res.has_next():
+        print(res.next())
+
+    session_pool.put_back(session)
+
+
+def insert_data(num: int):
+    print("insert data for table" + str(num))
+    # Get a session from the pool
+    session = session_pool.get_session()
+    column_names = [
+        "id1",
+        "attr1",
+        "m1",
+    ]
+    data_types = [
+        TSDataType.STRING,
+        TSDataType.STRING,
+        TSDataType.DOUBLE,
+    ]
+    column_types = [ColumnType.ID, ColumnType.ATTRIBUTE, 
ColumnType.MEASUREMENT]
+    timestamps = []
+    values = []
+    for row in range(15):
+        timestamps.append(row)
+        values.append(["id:" + str(row), "attr:" + str(row), row * 1.0])
+    tablet = Tablet(
+        "table" + str(num), column_names, data_types, values, timestamps, 
column_types
+    )
+    session.insert_relational_tablet(tablet)
+    session.execute_non_query_statement("FLush")
+
+    np_timestamps = np.arange(15, 30, dtype=np.dtype(">i8"))
+    np_values = [
+        np.array(["id:{}".format(i) for i in range(15, 30)]),
+        np.array(["attr:{}".format(i) for i in range(15, 30)]),
+        np.linspace(15.0, 29.0, num=15, dtype=TSDataType.DOUBLE.np_dtype()),
+    ]
+
+    np_tablet = NumpyTablet(
+        "table" + str(num),
+        column_names,
+        data_types,
+        np_values,
+        np_timestamps,
+        column_types=column_types,
+    )
+    session.insert_relational_tablet(np_tablet)
+    session_pool.put_back(session)
+
+
+def query_data():
+    # Get a session from the pool
+    session = session_pool.get_session()
+
+    print("get data from table0")
+    res = session.execute_query_statement("select * from table0")
+    while res.has_next():
+        print(res.next())
+
+    print("get data from table1")
+    res = session.execute_query_statement("select * from table0")
+    while res.has_next():
+        print(res.next())
+
+    session_pool.put_back(session)
+
+
+def delete_data():
+    session = session_pool.get_session()
+    session.execute_non_query_statement("drop database db1")
+    print("data has been deleted. now the databases are:")
+    res = session.execute_statement("show databases")
+    while res.has_next():
+        print(res.next())
+    session_pool.put_back(session)
+
+
+ip = "127.0.0.1"
+port = "6667"
+username = "root"
+password = "root"
+pool_config = PoolConfig(
+    node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
+    user_name=username,
+    password=password,
+    fetch_size=1024,
+    time_zone="UTC+8",
+    max_retry=3,
+    sql_dialect="table",
+    database="db1",
+)
+max_pool_size = 5
+wait_timeout_in_ms = 3000
+
+# Create a session pool
+session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms)
+
+prepare_data()
+
+insert_thread1 = threading.Thread(target=insert_data, args=(0,))
+insert_thread2 = threading.Thread(target=insert_data, args=(1,))
+
+insert_thread1.start()
+insert_thread2.start()
+
+insert_thread1.join()
+insert_thread2.join()
+
+query_data()
+delete_data()
+session_pool.close()
+print("example is finished!")
diff --git a/iotdb-client/client-py/iotdb/Session.py 
b/iotdb-client/client-py/iotdb/Session.py
index 5aca06c2781..098faf0d520 100644
--- a/iotdb-client/client-py/iotdb/Session.py
+++ b/iotdb-client/client-py/iotdb/Session.py
@@ -73,6 +73,7 @@ class Session(object):
     DEFAULT_PASSWORD = "root"
     DEFAULT_ZONE_ID = time.strftime("%z")
     RETRY_NUM = 3
+    SQL_DIALECT = "tree"
 
     def __init__(
         self,
@@ -83,6 +84,8 @@ class Session(object):
         fetch_size=DEFAULT_FETCH_SIZE,
         zone_id=DEFAULT_ZONE_ID,
         enable_redirection=True,
+        sql_dialect=SQL_DIALECT,
+        database=None,
     ):
         self.__host = host
         self.__port = port
@@ -103,6 +106,8 @@ class Session(object):
         self.__enable_redirection = enable_redirection
         self.__device_id_to_endpoint = None
         self.__endpoint_to_connection = None
+        self.__sql_dialect = sql_dialect
+        self.__database = database
 
     @classmethod
     def init_from_node_urls(
@@ -113,11 +118,21 @@ class Session(object):
         fetch_size=DEFAULT_FETCH_SIZE,
         zone_id=DEFAULT_ZONE_ID,
         enable_redirection=True,
+        sql_dialect=None,
+        database=None,
     ):
         if node_urls is None:
             raise RuntimeError("node urls is empty")
         session = Session(
-            None, None, user, password, fetch_size, zone_id, enable_redirection
+            None,
+            None,
+            user,
+            password,
+            fetch_size,
+            zone_id,
+            enable_redirection,
+            sql_dialect=sql_dialect,
+            database=database,
         )
         session.__hosts = []
         session.__ports = []
@@ -180,12 +195,15 @@ class Session(object):
         else:
             client = 
Client(TBinaryProtocol.TBinaryProtocolAccelerated(transport))
 
+        configuration = {"version": "V_1_0", "sql_dialect": self.__sql_dialect}
+        if self.__database is not None:
+            configuration["db"] = self.__database
         open_req = TSOpenSessionReq(
             client_protocol=self.protocol_version,
             username=self.__user,
             password=self.__password,
             zoneId=self.__zone_id,
-            configuration={"version": "V_1_0"},
+            configuration=configuration,
         )
 
         try:
@@ -890,13 +908,15 @@ class Session(object):
         """
         request = self.gen_insert_tablet_req(tablet)
         try:
-            connection = self.get_connection(tablet.get_device_id())
+            connection = self.get_connection(tablet.get_insert_target_name())
             request.sessionId = connection.session_id
             return Session.verify_success_with_redirection(
                 connection.client.insertTablet(request)
             )
         except RedirectException as e:
-            return self.handle_redirection(tablet.get_device_id(), 
e.redirect_node)
+            return self.handle_redirection(
+                tablet.get_insert_target_name(), e.redirect_node
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
@@ -915,14 +935,14 @@ class Session(object):
         if self.__enable_redirection:
             request_group = {}
             for i in range(len(tablet_lst)):
-                connection = self.get_connection(tablet_lst[i].get_device_id())
+                connection = 
self.get_connection(tablet_lst[i].get_insert_target_name())
                 request = request_group.setdefault(
                     connection.client,
                     TSInsertTabletsReq(
                         connection.session_id, [], [], [], [], [], [], False
                     ),
                 )
-                request.prefixPaths.append(tablet_lst[i].get_device_id())
+                
request.prefixPaths.append(tablet_lst[i].get_insert_target_name())
                 
request.timestampsList.append(tablet_lst[i].get_binary_timestamps())
                 
request.measurementsList.append(tablet_lst[i].get_measurements())
                 request.valuesList.append(tablet_lst[i].get_binary_values())
@@ -981,13 +1001,15 @@ class Session(object):
         """
         request = self.gen_insert_tablet_req(tablet, True)
         try:
-            connection = self.get_connection(tablet.get_device_id())
+            connection = self.get_connection(tablet.get_insert_target_name())
             request.sessionId = connection.session_id
             return Session.verify_success_with_redirection(
                 connection.client.insertTablet(request)
             )
         except RedirectException as e:
-            return self.handle_redirection(tablet.get_device_id(), 
e.redirect_node)
+            return self.handle_redirection(
+                tablet.get_insert_target_name(), e.redirect_node
+            )
         except TTransport.TException as e:
             if self.reconnect():
                 try:
@@ -1006,14 +1028,14 @@ class Session(object):
         if self.__enable_redirection:
             request_group = {}
             for i in range(len(tablet_lst)):
-                connection = self.get_connection(tablet_lst[i].get_device_id())
+                connection = 
self.get_connection(tablet_lst[i].get_insert_target_name())
                 request = request_group.setdefault(
                     connection.client,
                     TSInsertTabletsReq(
                         connection.session_id, [], [], [], [], [], [], True
                     ),
                 )
-                request.prefixPaths.append(tablet_lst[i].get_device_id())
+                
request.prefixPaths.append(tablet_lst[i].get_insert_target_name())
                 
request.timestampsList.append(tablet_lst[i].get_binary_timestamps())
                 
request.measurementsList.append(tablet_lst[i].get_measurements())
                 request.valuesList.append(tablet_lst[i].get_binary_values())
@@ -1058,6 +1080,36 @@ class Session(object):
                         self.connection_error_msg()
                     ) from None
 
+    def insert_relational_tablet(self, tablet):
+        """
+        insert one tablet, for example three column in the table1 can form a 
tablet:
+            timestamps,    id1,  attr1,    m1
+                     1,  id:1,  attr:1,   1.0
+                     2,  id:1,  attr:1,   2.0
+                     3,  id:2,  attr:2,   3.0
+        :param tablet: a tablet specified above
+        """
+        request = self.gen_insert_relational_tablet_req(tablet)
+        try:
+            connection = self.get_connection(tablet.get_insert_target_name())
+            request.sessionId = connection.session_id
+            return Session.verify_success_with_redirection(
+                connection.client.insertTablet(request)
+            )
+        except RedirectException as e:
+            return self.handle_redirection(
+                tablet.get_insert_target_name(), e.redirect_node
+            )
+        except TTransport.TException as e:
+            if self.reconnect():
+                try:
+                    request.sessionId = self.__session_id
+                    return 
Session.verify_success(self.__client.insertTablet(request))
+                except TTransport.TException as e1:
+                    raise IoTDBConnectionException(e1) from None
+            else:
+                raise IoTDBConnectionException(self.connection_error_msg()) 
from None
+
     def insert_records_of_one_device(
         self, device_id, times_list, measurements_list, types_list, values_list
     ):
@@ -1274,13 +1326,27 @@ class Session(object):
     def gen_insert_tablet_req(self, tablet, is_aligned=False):
         return TSInsertTabletReq(
             self.__session_id,
-            tablet.get_device_id(),
+            tablet.get_insert_target_name(),
+            tablet.get_measurements(),
+            tablet.get_binary_values(),
+            tablet.get_binary_timestamps(),
+            tablet.get_data_types(),
+            tablet.get_row_number(),
+            is_aligned,
+        )
+
+    def gen_insert_relational_tablet_req(self, tablet, is_aligned=False):
+        return TSInsertTabletReq(
+            self.__session_id,
+            tablet.get_insert_target_name(),
             tablet.get_measurements(),
             tablet.get_binary_values(),
             tablet.get_binary_timestamps(),
             tablet.get_data_types(),
             tablet.get_row_number(),
             is_aligned,
+            True,
+            tablet.get_column_categories(),
         )
 
     def gen_insert_tablets_req(self, tablet_lst, is_aligned=False):
@@ -1291,7 +1357,7 @@ class Session(object):
         type_lst = []
         size_lst = []
         for tablet in tablet_lst:
-            device_id_lst.append(tablet.get_device_id())
+            device_id_lst.append(tablet.get_insert_target_name())
             measurements_lst.append(tablet.get_measurements())
             values_lst.append(tablet.get_binary_values())
             timestamps_lst.append(tablet.get_binary_timestamps())
@@ -1364,6 +1430,18 @@ class Session(object):
             else:
                 raise IoTDBConnectionException(self.connection_error_msg()) 
from None
 
+        previous_db = self.__database
+        if resp.database is not None:
+            self.__database = resp.database
+        if previous_db != self.__database and self.__endpoint_to_connection is 
not None:
+            iterator = iter(self.__endpoint_to_connection.items())
+            for entry in list(iterator):
+                endpoint, connection = entry
+                if connection != self.__default_connection:
+                    try:
+                        connection.change_database(sql)
+                    except Exception as e:
+                        self.__endpoint_to_connection.pop(endpoint)
         return Session.verify_success(resp.status)
 
     def execute_statement(self, sql: str, timeout=0):
@@ -2222,6 +2300,17 @@ class SessionConnection(object):
         self.session_id = session_id
         self.statement_id = statement_id
 
+    def change_database(self, sql):
+        try:
+            self.client.executeUpdateStatement(
+                TSExecuteStatementReq(self.session_id, sql, self.statement_id)
+            )
+        except TTransport.TException as e:
+            raise IoTDBConnectionException(
+                "failed to change database",
+                e,
+            ) from None
+
     def close_connection(self, req):
         try:
             self.client.closeSession(req)
diff --git a/iotdb-client/client-py/iotdb/SessionPool.py 
b/iotdb-client/client-py/iotdb/SessionPool.py
index 6f1d7580796..9585bfe3e10 100644
--- a/iotdb-client/client-py/iotdb/SessionPool.py
+++ b/iotdb-client/client-py/iotdb/SessionPool.py
@@ -27,6 +27,7 @@ DEFAULT_MULTIPIE = 5
 DEFAULT_FETCH_SIZE = 5000
 DEFAULT_MAX_RETRY = 3
 DEFAULT_TIME_ZONE = "UTC+8"
+SQL_DIALECT = "tree"
 logger = logging.getLogger("IoTDB")
 
 
@@ -42,6 +43,8 @@ class PoolConfig(object):
         time_zone: str = DEFAULT_TIME_ZONE,
         max_retry: int = DEFAULT_MAX_RETRY,
         enable_compression: bool = False,
+        sql_dialect: str = SQL_DIALECT,
+        database: str = None,
     ):
         self.host = host
         self.port = port
@@ -58,6 +61,8 @@ class PoolConfig(object):
         self.time_zone = time_zone
         self.max_retry = max_retry
         self.enable_compression = enable_compression
+        self.sql_dialect = sql_dialect
+        self.database = database
 
 
 class SessionPool(object):
@@ -80,6 +85,9 @@ class SessionPool(object):
                 self.__pool_config.password,
                 self.__pool_config.fetch_size,
                 self.__pool_config.time_zone,
+                enable_redirection=True,
+                sql_dialect=self.__pool_config.sql_dialect,
+                database=self.__pool_config.database,
             )
 
         else:
@@ -90,6 +98,9 @@ class SessionPool(object):
                 self.__pool_config.password,
                 self.__pool_config.fetch_size,
                 self.__pool_config.time_zone,
+                enable_redirection=True,
+                sql_dialect=self.__pool_config.sql_dialect,
+                database=self.__pool_config.database,
             )
 
         session.open(self.__pool_config.enable_compression)
diff --git a/iotdb-client/client-py/iotdb/utils/Field.py 
b/iotdb-client/client-py/iotdb/utils/Field.py
index 913c7594bf5..7b4b05eab72 100644
--- a/iotdb-client/client-py/iotdb/utils/Field.py
+++ b/iotdb-client/client-py/iotdb/utils/Field.py
@@ -38,13 +38,19 @@ class Field(object):
                 output.set_bool_value(field.get_bool_value())
             elif output.get_data_type() == TSDataType.INT32:
                 output.set_int_value(field.get_int_value())
-            elif output.get_data_type() == TSDataType.INT64:
+            elif (
+                output.get_data_type() == TSDataType.INT64
+                or output.get_data_type() == TSDataType.TIMESTAMP
+            ):
                 output.set_long_value(field.get_long_value())
             elif output.get_data_type() == TSDataType.FLOAT:
                 output.set_float_value(field.get_float_value())
             elif output.get_data_type() == TSDataType.DOUBLE:
                 output.set_double_value(field.get_double_value())
-            elif output.get_data_type() == TSDataType.TEXT:
+            elif (
+                output.get_data_type() == TSDataType.TEXT
+                or output.get_data_type() == TSDataType.STRING
+            ):
                 output.set_binary_value(field.get_binary_value())
             else:
                 raise Exception(
@@ -94,6 +100,7 @@ class Field(object):
             raise Exception("Null Field Exception!")
         if (
             self.__data_type != TSDataType.INT64
+            and self.__data_type != TSDataType.TIMESTAMP
             or self.value is None
             or self.value is pd.NA
         ):
@@ -136,6 +143,8 @@ class Field(object):
             raise Exception("Null Field Exception!")
         if (
             self.__data_type != TSDataType.TEXT
+            and self.__data_type != TSDataType.STRING
+            and self.__data_type != TSDataType.BLOB
             or self.value is None
             or self.value is pd.NA
         ):
@@ -145,7 +154,7 @@ class Field(object):
     def get_string_value(self):
         if self.__data_type is None or self.value is None or self.value is 
pd.NA:
             return "None"
-        elif self.__data_type == 5:
+        elif self.__data_type == 5 or self.__data_type == 11:
             return self.value.decode("utf-8")
         else:
             return str(self.get_object_value(self.__data_type))
diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py 
b/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py
index 9b671663ff4..daae37d23ba 100644
--- a/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py
+++ b/iotdb-client/client-py/iotdb/utils/IoTDBConstants.py
@@ -27,6 +27,10 @@ class TSDataType(IntEnum):
     FLOAT = 3
     DOUBLE = 4
     TEXT = 5
+    TIMESTAMP = 8
+    DATE = 9
+    BLOB = 10
+    STRING = 11
 
     def np_dtype(self):
         return {
@@ -36,6 +40,8 @@ class TSDataType(IntEnum):
             TSDataType.INT32: np.dtype(">i4"),
             TSDataType.INT64: np.dtype(">i8"),
             TSDataType.TEXT: np.dtype("str"),
+            TSDataType.TIMESTAMP: np.dtype(">i8"),
+            TSDataType.STRING: np.dtype("str"),
         }[self]
 
 
diff --git a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py 
b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
index 3151b2acfd8..2aa885125d8 100644
--- a/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
+++ b/iotdb-client/client-py/iotdb/utils/IoTDBRpcDataSet.py
@@ -182,11 +182,11 @@ class IoTDBRpcDataSet(object):
                 data_array = np.frombuffer(
                     value_buffer, np.dtype(np.int32).newbyteorder(">")
                 )
-            elif data_type == 2:
+            elif data_type == 2 or data_type == 8:
                 data_array = np.frombuffer(
                     value_buffer, np.dtype(np.int64).newbyteorder(">")
                 )
-            elif data_type == 5:
+            elif data_type == 5 or data_type == 11:
                 j = 0
                 offset = 0
                 data_array = []
@@ -225,7 +225,7 @@ class IoTDBRpcDataSet(object):
 
                 if data_type == 1:
                     tmp_array = pd.Series(tmp_array, dtype="Int32")
-                elif data_type == 2:
+                elif data_type == 2 or data_type == 8:
                     tmp_array = pd.Series(tmp_array, dtype="Int64")
                 elif data_type == 0:
                     tmp_array = pd.Series(tmp_array, dtype="boolean")
@@ -297,11 +297,11 @@ class IoTDBRpcDataSet(object):
                     data_array = np.frombuffer(
                         value_buffer, np.dtype(np.int32).newbyteorder(">")
                     )
-                elif data_type == 2:
+                elif data_type == 2 or data_type == 8:
                     data_array = np.frombuffer(
                         value_buffer, np.dtype(np.int64).newbyteorder(">")
                     )
-                elif data_type == 5:
+                elif data_type == 5 or data_type == 11:
                     j = 0
                     offset = 0
                     data_array = []
@@ -327,13 +327,13 @@ class IoTDBRpcDataSet(object):
                 self.__query_data_set.valueList[location] = None
                 tmp_array = []
                 if len(data_array) < total_length:
-                    if data_type == 1 or data_type == 2:
+                    if data_type == 1 or data_type == 2 or data_type == 8:
                         tmp_array = np.full(total_length, np.nan, np.float32)
                     elif data_type == 3 or data_type == 4:
                         tmp_array = np.full(total_length, np.nan, 
data_array.dtype)
                     elif data_type == 0:
                         tmp_array = np.full(total_length, np.nan, np.float32)
-                    elif data_type == 5:
+                    elif data_type == 5 or data_type == 11 or data_type == 10:
                         tmp_array = np.full(total_length, None, 
dtype=data_array.dtype)
 
                     bitmap_buffer = self.__query_data_set.bitmapList[location]
@@ -345,7 +345,7 @@ class IoTDBRpcDataSet(object):
 
                     if data_type == 1:
                         tmp_array = pd.Series(tmp_array).astype("Int32")
-                    elif data_type == 2:
+                    elif data_type == 2 or data_type == 8:
                         tmp_array = pd.Series(tmp_array).astype("Int64")
                     elif data_type == 0:
                         tmp_array = pd.Series(tmp_array).astype("boolean")
diff --git a/iotdb-client/client-py/iotdb/utils/NumpyTablet.py 
b/iotdb-client/client-py/iotdb/utils/NumpyTablet.py
index 4577f7f880c..f00b61de519 100644
--- a/iotdb-client/client-py/iotdb/utils/NumpyTablet.py
+++ b/iotdb-client/client-py/iotdb/utils/NumpyTablet.py
@@ -17,28 +17,45 @@
 #
 
 import struct
+
+from numpy import ndarray
+from typing import List
 from iotdb.utils.IoTDBConstants import TSDataType
 from iotdb.utils.BitMap import BitMap
+from iotdb.utils.Tablet import ColumnType
 
 
 class NumpyTablet(object):
     def __init__(
-        self, device_id, measurements, data_types, values, timestamps, 
bitmaps=None
+        self,
+        insert_target_name: str,
+        column_names: List[str],
+        data_types: List[TSDataType],
+        values: List[ndarray],
+        timestamps: ndarray,
+        bitmaps: List[BitMap] = None,
+        column_types: List[ColumnType] = None,
     ):
         """
         creating a numpy tablet for insertion
-          for example, considering device: root.sg1.d1
+          for example using tree-model, considering device: root.sg1.d1
             timestamps,     m1,    m2,     m3
                      1,  125.3,  True,  text1
                      2,  111.6, False,  text2
                      3,  688.6,  True,  text3
-        Notice: From 0.13.0, the tablet can contain empty cell
-                The tablet will be sorted at the initialization by timestamps
-        :param device_id: String, IoTDB time series path to device layer 
(without sensor)
-        :param measurements: List, sensors
-        :param data_types: TSDataType List, specify value types for sensors
-        :param values: List of numpy array, the values of each column should 
be the inner numpy array
-        :param timestamps: Numpy array, the timestamps
+          for example using table-model, considering table: table1
+            timestamps,    id1,  attr1,    m1
+                     1,  id:1,  attr:1,   1.0
+                     2,  id:1,  attr:1,   2.0
+                     3,  id:2,  attr:2,   3.0
+        Notice: The tablet will be sorted at the initialization by timestamps
+        :param insert_target_name: Str, DeviceId if using tree-view interfaces 
or TableName when using table-view interfaces.
+        :param column_names: Str List, names of columns
+        :param data_types: TSDataType List, specify value types for columns
+        :param values: ndarray List, one ndarray contains the value of one 
column
+        :param timestamps: ndarray, contains the timestamps
+        :param bitmaps: BitMap list, one bitmap records the position of none 
value in a column
+        :param column_types: ColumnType List, marking the type of each column, 
can be none for tree-view interfaces.
         """
         if len(values) > 0 and len(values[0]) != len(timestamps):
             raise RuntimeError(
@@ -63,12 +80,18 @@ class NumpyTablet(object):
 
         self.__values = values
         self.__timestamps = timestamps
-        self.__device_id = device_id
-        self.__measurements = measurements
+        self.__insert_target_name = insert_target_name
+        self.__measurements = column_names
         self.__data_types = data_types
         self.__row_number = len(timestamps)
-        self.__column_number = len(measurements)
+        self.__column_number = len(column_names)
         self.bitmaps = bitmaps
+        if column_types is None:
+            self.__column_types = ColumnType.n_copy(
+                ColumnType.MEASUREMENT, self.__column_number
+            )
+        else:
+            self.__column_types = column_types
 
     @staticmethod
     def check_sorted(timestamps):
@@ -83,11 +106,14 @@ class NumpyTablet(object):
     def get_data_types(self):
         return self.__data_types
 
+    def get_column_categories(self):
+        return self.__column_types
+
     def get_row_number(self):
         return self.__row_number
 
-    def get_device_id(self):
-        return self.__device_id
+    def get_insert_target_name(self):
+        return self.__insert_target_name
 
     def get_timestamps(self):
         return self.__timestamps
@@ -102,8 +128,8 @@ class NumpyTablet(object):
         bs_len = 0
         bs_list = []
         for data_type, value in zip(self.__data_types, self.__values):
-            # TEXT
-            if data_type == 5:
+            # TEXT, STRING, BLOB
+            if data_type == 5 or data_type == 11 or data_type == 10:
                 format_str_list = [">"]
                 values_tobe_packed = []
                 for str_list in value:
diff --git a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py 
b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
index a7ce7dfc40c..c69aca3ebbe 100644
--- a/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
+++ b/iotdb-client/client-py/iotdb/utils/SessionDataSet.py
@@ -124,7 +124,7 @@ class SessionDataSet(object):
     def close_operation_handle(self):
         self.iotdb_rpc_data_set.close()
 
-    def todf(self):
+    def todf(self) -> pd.DataFrame:
         return resultset_to_pandas(self)
 
 
@@ -147,6 +147,8 @@ def get_typed_point(field: Field, none_value=None):
         TSDataType.INT32: lambda field: field.get_int_value(),
         TSDataType.DOUBLE: lambda field: field.get_double_value(),
         TSDataType.INT64: lambda field: field.get_long_value(),
+        TSDataType.TIMESTAMP: lambda field: field.get_long_value(),
+        TSDataType.STRING: lambda field: field.get_string_value(),
     }
 
     result_next_type: TSDataType = field.get_data_type()
diff --git a/iotdb-client/client-py/iotdb/utils/Tablet.py 
b/iotdb-client/client-py/iotdb/utils/Tablet.py
index 508361dc857..0f086530347 100644
--- a/iotdb-client/client-py/iotdb/utils/Tablet.py
+++ b/iotdb-client/client-py/iotdb/utils/Tablet.py
@@ -17,26 +17,54 @@
 #
 
 import struct
-
+from enum import unique, IntEnum
+from typing import List
 from iotdb.utils.BitMap import BitMap
+from iotdb.utils.IoTDBConstants import TSDataType
+
+
+@unique
+class ColumnType(IntEnum):
+    ID = 0
+    MEASUREMENT = 1
+    ATTRIBUTE = 2
+
+    def n_copy(self, n):
+        result = []
+        for i in range(n):
+            result.append(self)
+        return result
 
 
 class Tablet(object):
-    def __init__(self, device_id, measurements, data_types, values, 
timestamps):
+    def __init__(
+        self,
+        insert_target_name: str,
+        column_names: List[str],
+        data_types: List[TSDataType],
+        values: List[List],
+        timestamps: List[int],
+        column_types: List[ColumnType] = None,
+    ):
         """
         creating a tablet for insertion
-          for example, considering device: root.sg1.d1
+          for example using tree-model, considering device: root.sg1.d1
             timestamps,     m1,    m2,     m3
                      1,  125.3,  True,  text1
                      2,  111.6, False,  text2
                      3,  688.6,  True,  text3
-        Notice: From 0.13.0, the tablet can contain empty cell
-                The tablet will be sorted at the initialization by timestamps
-        :param device_id: String, IoTDB time series path to device layer 
(without sensor)
-        :param measurements: List, sensors
-        :param data_types: TSDataType List, specify value types for sensors
+          for example using table-model, considering table: table1
+            timestamps,    id1,  attr1,    m1
+                     1,  id:1,  attr:1,   1.0
+                     2,  id:1,  attr:1,   2.0
+                     3,  id:2,  attr:2,   3.0
+        Notice: The tablet will be sorted at the initialization by timestamps
+        :param insert_target_name: Str, DeviceId if using tree-view interfaces 
or TableName when using table-view interfaces.
+        :param column_names: Str List, names of columns
+        :param data_types: TSDataType List, specify value types for columns
         :param values: 2-D List, the values of each row should be the outer 
list element
-        :param timestamps: List,
+        :param timestamps: int List, contains the timestamps
+        :param column_types: ColumnType List, marking the type of each column, 
can be none for tree-view interfaces.
         """
         if len(timestamps) != len(values):
             raise RuntimeError(
@@ -51,11 +79,17 @@ class Tablet(object):
             self.__values = values
             self.__timestamps = timestamps
 
-        self.__device_id = device_id
-        self.__measurements = measurements
+        self.__insert_target_name = insert_target_name
+        self.__measurements = column_names
         self.__data_types = data_types
         self.__row_number = len(timestamps)
-        self.__column_number = len(measurements)
+        self.__column_number = len(column_names)
+        if column_types is None:
+            self.__column_types = ColumnType.n_copy(
+                ColumnType.MEASUREMENT, self.__column_number
+            )
+        else:
+            self.__column_types = column_types
 
     @staticmethod
     def check_sorted(timestamps):
@@ -70,11 +104,14 @@ class Tablet(object):
     def get_data_types(self):
         return self.__data_types
 
+    def get_column_categories(self):
+        return self.__column_types
+
     def get_row_number(self):
         return self.__row_number
 
-    def get_device_id(self):
-        return self.__device_id
+    def get_insert_target_name(self):
+        return self.__insert_target_name
 
     def get_binary_timestamps(self):
         format_str_list = [">"]
@@ -150,7 +187,7 @@ class Tablet(object):
                         self.__mark_none_value(bitmaps, i, j)
                         has_none = True
 
-            elif data_type_value == 5:
+            elif data_type_value == 5 or data_type_value == 11:
                 for j in range(self.__row_number):
                     if self.__values[j][i] is not None:
                         if isinstance(self.__values[j][i], str):
diff --git 
a/iotdb-client/client-py/tests/integration/test_relational_session.py 
b/iotdb-client/client-py/tests/integration/test_relational_session.py
new file mode 100644
index 00000000000..cb5382ddb49
--- /dev/null
+++ b/iotdb-client/client-py/tests/integration/test_relational_session.py
@@ -0,0 +1,143 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import numpy as np
+
+from iotdb.Session import Session
+from iotdb.SessionPool import PoolConfig, create_session_pool
+from iotdb.utils.IoTDBConstants import TSDataType
+from iotdb.utils.NumpyTablet import NumpyTablet
+from iotdb.utils.Tablet import Tablet, ColumnType
+from iotdb.IoTDBContainer import IoTDBContainer
+
+
+def test_session():
+    session_test()
+
+
+def test_session_pool():
+    session_test(True)
+
+
+def session_test(use_session_pool=False):
+    with IoTDBContainer("iotdb:dev") as db:
+        db: IoTDBContainer
+
+        if use_session_pool:
+            pool_config = PoolConfig(
+                db.get_container_host_ip(),
+                db.get_exposed_port(6667),
+                "root",
+                "root",
+                None,
+                1024,
+                "Asia/Shanghai",
+                3,
+                sql_dialect="table",
+            )
+            session_pool = create_session_pool(pool_config, 1, 3000)
+            session = session_pool.get_session()
+        else:
+            session = Session(
+                db.get_container_host_ip(),
+                db.get_exposed_port(6667),
+                sql_dialect="table",
+            )
+        session.open(False)
+
+        if not session.is_open():
+            print("can't open session")
+            exit(1)
+
+        session.execute_non_query_statement("CREATE DATABASE IF NOT EXISTS 
db1")
+        session.execute_non_query_statement('USE "db1"')
+        session.execute_non_query_statement(
+            "CREATE TABLE table5 (id1 string id, attr1 string attribute, "
+            + "m1 double "
+            + "measurement)"
+        )
+
+        column_names = [
+            "id1",
+            "attr1",
+            "m1",
+        ]
+        data_types = [
+            TSDataType.STRING,
+            TSDataType.STRING,
+            TSDataType.DOUBLE,
+        ]
+        column_types = [ColumnType.ID, ColumnType.ATTRIBUTE, 
ColumnType.MEASUREMENT]
+        timestamps = []
+        values = []
+        for row in range(15):
+            timestamps.append(row)
+            values.append(["id:" + str(row), "attr:" + str(row), row * 1.0])
+        tablet = Tablet(
+            "table5", column_names, data_types, values, timestamps, 
column_types
+        )
+        session.insert_relational_tablet(tablet)
+
+        session.execute_non_query_statement("FLush")
+
+        np_timestamps = np.arange(15, 30, dtype=np.dtype(">i8"))
+        np_values = [
+            np.array(["id:{}".format(i) for i in range(15, 30)]),
+            np.array(["attr:{}".format(i) for i in range(15, 30)]),
+            np.linspace(15.0, 29.0, num=15, 
dtype=TSDataType.DOUBLE.np_dtype()),
+        ]
+
+        np_tablet = NumpyTablet(
+            "table5",
+            column_names,
+            data_types,
+            np_values,
+            np_timestamps,
+            column_types=column_types,
+        )
+        session.insert_relational_tablet(np_tablet)
+
+        with session.execute_query_statement(
+            "select * from table5 order by time"
+        ) as dataset:
+            cnt = 0
+            while dataset.has_next():
+                row_record = dataset.next()
+                timestamp = row_record.get_fields()[0].get_long_value()
+                assert (
+                    "id:" + str(timestamp)
+                    == row_record.get_fields()[1].get_string_value()
+                )
+                assert (
+                    "attr:" + str(timestamp)
+                    == row_record.get_fields()[2].get_string_value()
+                )
+                assert timestamp * 1.0 == 
row_record.get_fields()[3].get_double_value()
+                cnt += 1
+            assert 30 == cnt
+
+        with session.execute_query_statement(
+            "select * from table5 order by time"
+        ) as dataset:
+            df = dataset.todf()
+            rows, columns = df.shape
+            assert rows == 30
+            assert columns == 4
+
+        # close session connection.
+        session.close()


Reply via email to