This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 83c59ef43c [IOTDB-3033] Completing the query and writing interface 
compared to java (#5713)
83c59ef43c is described below

commit 83c59ef43cbbf00265b8b6f2bbab59c519efc2ef
Author: Tom <[email protected]>
AuthorDate: Thu Apr 28 18:05:11 2022 +0800

    [IOTDB-3033] Completing the query and writing interface compared to java 
(#5713)
    
    Co-authored-by: jade.deng <[email protected]>
---
 client-py/SessionAlignedTimeseriesExample.py     |  18 +++
 client-py/SessionExample.py                      |  30 +++++
 client-py/iotdb/Session.py                       | 132 +++++++++++++++++++-
 client-py/tests/tablet_performance_comparison.py |   4 +-
 client-py/tests/test_one_device.py               | 152 +++++++++++++++++++++++
 5 files changed, 332 insertions(+), 4 deletions(-)

diff --git a/client-py/SessionAlignedTimeseriesExample.py 
b/client-py/SessionAlignedTimeseriesExample.py
index 0787fe91c3..0d8271faa7 100644
--- a/client-py/SessionAlignedTimeseriesExample.py
+++ b/client-py/SessionAlignedTimeseriesExample.py
@@ -201,6 +201,24 @@ with session.execute_query_statement(
     while session_data_set.has_next():
         print(session_data_set.next())
 
+# insert aligned string record into the database.
+time_list = [1, 2, 3]
+measurements_list = [
+    ["s_01", "s_02", "s_03"],
+    ["s_01", "s_02", "s_03"],
+    ["s_01", "s_02", "s_03"],
+]
+values_list = [["False", "22", "33"], ["True", "1", "23"], ["False", "15", 
"26"]]
+session.insert_aligned_string_records_of_one_device(
+    "root.sg_test_01.d_04",
+    time_list,
+    measurements_list,
+    values_list,
+)
+
+# delete storage group
+session.delete_storage_group("root.sg_test_01")
+
 # close session connection.
 session.close()
 
diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index bbc9669527..93aa839c3b 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -280,6 +280,36 @@ with session.execute_query_statement(
     while session_data_set.has_next():
         print(session_data_set.next())
 
+# insert string records of one device
+time_list = [1, 2, 3]
+measurements_list = [
+    ["s_01", "s_02", "s_03"],
+    ["s_01", "s_02", "s_03"],
+    ["s_01", "s_02", "s_03"],
+]
+values_list = [["False", "22", "33"], ["True", "1", "23"], ["False", "15", 
"26"]]
+
+session.insert_string_records_of_one_device(
+    "root.sg_test_01.d_03",
+    time_list,
+    measurements_list,
+    values_list,
+)
+
+with session.execute_raw_data_query(
+    ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 1, 4
+) as session_data_set:
+    session_data_set.set_fetch_size(1024)
+    while session_data_set.has_next():
+        print(session_data_set.next())
+
+with session.execute_last_data_query(
+    ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 0
+) as session_data_set:
+    session_data_set.set_fetch_size(1024)
+    while session_data_set.has_next():
+        print(session_data_set.next())
+
 # delete storage group
 session.delete_storage_group("root.sg_test_01")
 
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 819a0dd892..780ecc7d7c 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -34,13 +34,19 @@ from .thrift.rpc.TSIService import (
     TSExecuteStatementReq,
     TSOpenSessionReq,
     TSCreateMultiTimeseriesReq,
-    TSCreateSchemaTemplateReq,
     TSCloseSessionReq,
     TSInsertTabletsReq,
     TSInsertRecordsReq,
     TSInsertRecordsOfOneDeviceReq,
 )
-from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion, 
TSSetTimeZoneReq
+from .thrift.rpc.ttypes import (
+    TSDeleteDataReq,
+    TSProtocolVersion,
+    TSSetTimeZoneReq,
+    TSRawDataQueryReq,
+    TSLastDataQueryReq,
+    TSInsertStringRecordsOfOneDeviceReq,
+)
 
 # for debug
 # from IoTDBConstants import *
@@ -1023,3 +1029,125 @@ class Session(object):
 
         logger.error("error status is", status)
         return -1
+
+    def execute_raw_data_query(
+        self, paths: list, start_time: int, end_time: int
+    ) -> SessionDataSet:
+        request = TSRawDataQueryReq(
+            self.__session_id,
+            paths,
+            self.__fetch_size,
+            startTime=start_time,
+            endTime=end_time,
+            statementId=self.__statement_id,
+            enableRedirectQuery=False,
+        )
+        resp = self.__client.executeRawDataQuery(request)
+        return SessionDataSet(
+            "",
+            resp.columns,
+            resp.dataTypeList,
+            resp.columnNameIndexMap,
+            resp.queryId,
+            self.__client,
+            self.__statement_id,
+            self.__session_id,
+            resp.queryDataSet,
+            resp.ignoreTimeStamp,
+        )
+
+    def execute_last_data_query(self, paths: list, last_time: int) -> 
SessionDataSet:
+        request = TSLastDataQueryReq(
+            self.__session_id,
+            paths,
+            self.__fetch_size,
+            last_time,
+            self.__statement_id,
+            enableRedirectQuery=False,
+        )
+
+        resp = self.__client.executeLastDataQuery(request)
+        return SessionDataSet(
+            "",
+            resp.columns,
+            resp.dataTypeList,
+            resp.columnNameIndexMap,
+            resp.queryId,
+            self.__client,
+            self.__statement_id,
+            self.__session_id,
+            resp.queryDataSet,
+            resp.ignoreTimeStamp,
+        )
+
+    def insert_string_records_of_one_device(
+        self,
+        device_id: str,
+        times: list,
+        measurements_list: list,
+        values_list: list,
+        have_sorted: bool = False,
+    ):
+        if (len(times) != len(measurements_list)) or (len(times) != 
len(values_list)):
+            raise RuntimeError(
+                "insert records of one device error: times, measurementsList 
and valuesList's size should be equal!"
+            )
+        request = self.gen_insert_string_records_of_one_device_request(
+            device_id, times, measurements_list, values_list, have_sorted, 
False
+        )
+        status = self.__client.insertStringRecordsOfOneDevice(request)
+        logger.debug(
+            "insert one device {} message: {}".format(device_id, 
status.message)
+        )
+
+        return Session.verify_success(status)
+
+    def insert_aligned_string_records_of_one_device(
+        self,
+        device_id: str,
+        times: list,
+        measurements_list: list,
+        values: list,
+        have_sorted: bool = False,
+    ):
+        if (len(times) != len(measurements_list)) or (len(times) != 
len(values)):
+            raise RuntimeError(
+                "insert records of one device error: times, measurementsList 
and valuesList's size should be equal!"
+            )
+        request = self.gen_insert_string_records_of_one_device_request(
+            device_id, times, measurements_list, values, have_sorted, True
+        )
+        status = self.__client.insertStringRecordsOfOneDevice(request)
+        logger.debug(
+            "insert one device {} message: {}".format(device_id, 
status.message)
+        )
+
+        return Session.verify_success(status)
+
+    def gen_insert_string_records_of_one_device_request(
+        self,
+        device_id,
+        times,
+        measurements_list,
+        values_list,
+        have_sorted,
+        is_aligned=False,
+    ):
+        if (len(times) != len(measurements_list)) or (len(times) != 
len(values_list)):
+            raise RuntimeError(
+                "insert records of one device error: times, measurementsList 
and valuesList's size should be equal!"
+            )
+        if not Session.check_sorted(times):
+            # sort by timestamp
+            sorted_zipped = sorted(zip(times, measurements_list, values_list))
+            result = zip(*sorted_zipped)
+            times_list, measurements_list, values_list = [list(x) for x in 
result]
+        request = TSInsertStringRecordsOfOneDeviceReq(
+            self.__session_id,
+            device_id,
+            measurements_list,
+            values_list,
+            times,
+            is_aligned,
+        )
+        return request
diff --git a/client-py/tests/tablet_performance_comparison.py 
b/client-py/tests/tablet_performance_comparison.py
index 3626e818a8..ef5847140c 100644
--- a/client-py/tests/tablet_performance_comparison.py
+++ b/client-py/tests/tablet_performance_comparison.py
@@ -75,9 +75,9 @@ def generate_csv_data(
         if _type == TSDataType.BOOLEAN:
             return [random.randint(0, 1) == 1 for _ in range(_row)]
         elif _type == TSDataType.INT32:
-            return [random.randint(-(2**31), 2**31) for _ in range(_row)]
+            return [random.randint(-(2 ** 31), 2 ** 31) for _ in range(_row)]
         elif _type == TSDataType.INT64:
-            return [random.randint(-(2**63), 2**63) for _ in range(_row)]
+            return [random.randint(-(2 ** 63), 2 ** 63) for _ in range(_row)]
         elif _type == TSDataType.FLOAT:
             return [1.5 for _ in range(_row)]
         elif _type == TSDataType.DOUBLE:
diff --git a/client-py/tests/test_one_device.py 
b/client-py/tests/test_one_device.py
new file mode 100644
index 0000000000..c364cd1105
--- /dev/null
+++ b/client-py/tests/test_one_device.py
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Uncomment the following line to use apache-iotdb module installed by pip3
+
+from iotdb.Session import Session
+from iotdb.IoTDBContainer import IoTDBContainer
+
+# whether the test has passed
+final_flag = True
+failed_count = 0
+
+
+def test_fail():
+    global failed_count
+    global final_flag
+    final_flag = False
+    failed_count += 1
+
+
+def print_message(message):
+    print("*********")
+    print(message)
+    print("*********")
+
+
+def test_one_device():
+    with IoTDBContainer("iotdb:dev") as db:
+        db: IoTDBContainer
+        session = Session(db.get_container_host_ip(), 
db.get_exposed_port(6667))
+        session.open(False)
+
+        if not session.is_open():
+            print("can't open session")
+            exit(1)
+
+        # insert string records of one device
+        time_list = [1, 2, 3]
+        measurements_list = [
+            ["s_01", "s_02", "s_03"],
+            ["s_01", "s_02", "s_03"],
+            ["s_01", "s_02", "s_03"],
+        ]
+        values_list = [
+            ["False", "22", "33"],
+            ["True", "1", "23"],
+            ["False", "15", "26"],
+        ]
+
+        if (
+            session.insert_string_records_of_one_device(
+                "root.str_test_01.d_01",
+                time_list,
+                measurements_list,
+                values_list,
+            )
+            < 0
+        ):
+            test_fail()
+            print_message("insert string records of one device failed")
+
+        # insert aligned string record into the database.
+        time_list = [1, 2, 3]
+        measurements_list = [
+            ["s_01", "s_02", "s_03"],
+            ["s_01", "s_02", "s_03"],
+            ["s_01", "s_02", "s_03"],
+        ]
+        values_list = [
+            ["False", "22", "33"],
+            ["True", "1", "23"],
+            ["False", "15", "26"],
+        ]
+
+        if (
+            session.insert_aligned_string_records_of_one_device(
+                "root.str_test_01.d_02",
+                time_list,
+                measurements_list,
+                values_list,
+            )
+            < 0
+        ):
+            test_fail()
+            print_message("insert aligned record of one device failed")
+
+        # execute raw data query sql statement
+        session_data_set = session.execute_raw_data_query(
+            ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1, 4
+        )
+        session_data_set.set_fetch_size(1024)
+        expect_count = 3
+        actual_count = 0
+        while session_data_set.has_next():
+            print(session_data_set.next())
+            actual_count += 1
+        session_data_set.close_operation_handle()
+
+        if actual_count != expect_count:
+            test_fail()
+            print_message(
+                "query count mismatch: expect count: "
+                + str(expect_count)
+                + " actual count: "
+                + str(actual_count)
+            )
+        assert actual_count == expect_count
+
+        # execute last data query sql statement
+        session_data_set = session.execute_last_data_query(
+            ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 0
+        )
+        session_data_set.set_fetch_size(1024)
+        expect_time = 3
+        actual_time = session_data_set.next().get_timestamp()
+        session_data_set.close_operation_handle()
+
+        if actual_time != expect_time:
+            test_fail()
+            print_message(
+                "query count mismatch: expect count: "
+                + str(expect_time)
+                + " actual count: "
+                + str(actual_time)
+            )
+        assert actual_time == expect_time
+
+        # close session connection.
+        session.close()
+
+
+if final_flag:
+    print("All executions done!!")
+else:
+    print("Some test failed, please have a check")
+    print("failed count: ", failed_count)
+    exit(1)

Reply via email to