This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 83c59ef43c [IOTDB-3033] Completing the query and writing interface
compared to java (#5713)
83c59ef43c is described below
commit 83c59ef43cbbf00265b8b6f2bbab59c519efc2ef
Author: Tom <[email protected]>
AuthorDate: Thu Apr 28 18:05:11 2022 +0800
[IOTDB-3033] Completing the query and writing interface compared to java
(#5713)
Co-authored-by: jade.deng <[email protected]>
---
client-py/SessionAlignedTimeseriesExample.py | 18 +++
client-py/SessionExample.py | 30 +++++
client-py/iotdb/Session.py | 132 +++++++++++++++++++-
client-py/tests/tablet_performance_comparison.py | 4 +-
client-py/tests/test_one_device.py | 152 +++++++++++++++++++++++
5 files changed, 332 insertions(+), 4 deletions(-)
diff --git a/client-py/SessionAlignedTimeseriesExample.py
b/client-py/SessionAlignedTimeseriesExample.py
index 0787fe91c3..0d8271faa7 100644
--- a/client-py/SessionAlignedTimeseriesExample.py
+++ b/client-py/SessionAlignedTimeseriesExample.py
@@ -201,6 +201,24 @@ with session.execute_query_statement(
while session_data_set.has_next():
print(session_data_set.next())
+# insert aligned string record into the database.
+time_list = [1, 2, 3]
+measurements_list = [
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+]
+values_list = [["False", "22", "33"], ["True", "1", "23"], ["False", "15",
"26"]]
+session.insert_aligned_string_records_of_one_device(
+ "root.sg_test_01.d_04",
+ time_list,
+ measurements_list,
+ values_list,
+)
+
+# delete storage group
+session.delete_storage_group("root.sg_test_01")
+
# close session connection.
session.close()
diff --git a/client-py/SessionExample.py b/client-py/SessionExample.py
index bbc9669527..93aa839c3b 100644
--- a/client-py/SessionExample.py
+++ b/client-py/SessionExample.py
@@ -280,6 +280,36 @@ with session.execute_query_statement(
while session_data_set.has_next():
print(session_data_set.next())
+# insert string records of one device
+time_list = [1, 2, 3]
+measurements_list = [
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+]
+values_list = [["False", "22", "33"], ["True", "1", "23"], ["False", "15",
"26"]]
+
+session.insert_string_records_of_one_device(
+ "root.sg_test_01.d_03",
+ time_list,
+ measurements_list,
+ values_list,
+)
+
+with session.execute_raw_data_query(
+ ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 1, 4
+) as session_data_set:
+ session_data_set.set_fetch_size(1024)
+ while session_data_set.has_next():
+ print(session_data_set.next())
+
+with session.execute_last_data_query(
+ ["root.sg_test_01.d_03.s_01", "root.sg_test_01.d_03.s_02"], 0
+) as session_data_set:
+ session_data_set.set_fetch_size(1024)
+ while session_data_set.has_next():
+ print(session_data_set.next())
+
# delete storage group
session.delete_storage_group("root.sg_test_01")
diff --git a/client-py/iotdb/Session.py b/client-py/iotdb/Session.py
index 819a0dd892..780ecc7d7c 100644
--- a/client-py/iotdb/Session.py
+++ b/client-py/iotdb/Session.py
@@ -34,13 +34,19 @@ from .thrift.rpc.TSIService import (
TSExecuteStatementReq,
TSOpenSessionReq,
TSCreateMultiTimeseriesReq,
- TSCreateSchemaTemplateReq,
TSCloseSessionReq,
TSInsertTabletsReq,
TSInsertRecordsReq,
TSInsertRecordsOfOneDeviceReq,
)
-from .thrift.rpc.ttypes import TSDeleteDataReq, TSProtocolVersion,
TSSetTimeZoneReq
+from .thrift.rpc.ttypes import (
+ TSDeleteDataReq,
+ TSProtocolVersion,
+ TSSetTimeZoneReq,
+ TSRawDataQueryReq,
+ TSLastDataQueryReq,
+ TSInsertStringRecordsOfOneDeviceReq,
+)
# for debug
# from IoTDBConstants import *
@@ -1023,3 +1029,125 @@ class Session(object):
logger.error("error status is", status)
return -1
+
+ def execute_raw_data_query(
+ self, paths: list, start_time: int, end_time: int
+ ) -> SessionDataSet:
+ request = TSRawDataQueryReq(
+ self.__session_id,
+ paths,
+ self.__fetch_size,
+ startTime=start_time,
+ endTime=end_time,
+ statementId=self.__statement_id,
+ enableRedirectQuery=False,
+ )
+ resp = self.__client.executeRawDataQuery(request)
+ return SessionDataSet(
+ "",
+ resp.columns,
+ resp.dataTypeList,
+ resp.columnNameIndexMap,
+ resp.queryId,
+ self.__client,
+ self.__statement_id,
+ self.__session_id,
+ resp.queryDataSet,
+ resp.ignoreTimeStamp,
+ )
+
+ def execute_last_data_query(self, paths: list, last_time: int) ->
SessionDataSet:
+ request = TSLastDataQueryReq(
+ self.__session_id,
+ paths,
+ self.__fetch_size,
+ last_time,
+ self.__statement_id,
+ enableRedirectQuery=False,
+ )
+
+ resp = self.__client.executeLastDataQuery(request)
+ return SessionDataSet(
+ "",
+ resp.columns,
+ resp.dataTypeList,
+ resp.columnNameIndexMap,
+ resp.queryId,
+ self.__client,
+ self.__statement_id,
+ self.__session_id,
+ resp.queryDataSet,
+ resp.ignoreTimeStamp,
+ )
+
+ def insert_string_records_of_one_device(
+ self,
+ device_id: str,
+ times: list,
+ measurements_list: list,
+ values_list: list,
+ have_sorted: bool = False,
+ ):
+ if (len(times) != len(measurements_list)) or (len(times) !=
len(values_list)):
+ raise RuntimeError(
+ "insert records of one device error: times, measurementsList
and valuesList's size should be equal!"
+ )
+ request = self.gen_insert_string_records_of_one_device_request(
+ device_id, times, measurements_list, values_list, have_sorted,
False
+ )
+ status = self.__client.insertStringRecordsOfOneDevice(request)
+ logger.debug(
+ "insert one device {} message: {}".format(device_id,
status.message)
+ )
+
+ return Session.verify_success(status)
+
+ def insert_aligned_string_records_of_one_device(
+ self,
+ device_id: str,
+ times: list,
+ measurements_list: list,
+ values: list,
+ have_sorted: bool = False,
+ ):
+ if (len(times) != len(measurements_list)) or (len(times) !=
len(values)):
+ raise RuntimeError(
+ "insert records of one device error: times, measurementsList
and valuesList's size should be equal!"
+ )
+ request = self.gen_insert_string_records_of_one_device_request(
+ device_id, times, measurements_list, values, have_sorted, True
+ )
+ status = self.__client.insertStringRecordsOfOneDevice(request)
+ logger.debug(
+ "insert one device {} message: {}".format(device_id,
status.message)
+ )
+
+ return Session.verify_success(status)
+
+ def gen_insert_string_records_of_one_device_request(
+ self,
+ device_id,
+ times,
+ measurements_list,
+ values_list,
+ have_sorted,
+ is_aligned=False,
+ ):
+ if (len(times) != len(measurements_list)) or (len(times) !=
len(values_list)):
+ raise RuntimeError(
+ "insert records of one device error: times, measurementsList
and valuesList's size should be equal!"
+ )
+ if not Session.check_sorted(times):
+ # sort by timestamp
+ sorted_zipped = sorted(zip(times, measurements_list, values_list))
+ result = zip(*sorted_zipped)
+ times_list, measurements_list, values_list = [list(x) for x in
result]
+ request = TSInsertStringRecordsOfOneDeviceReq(
+ self.__session_id,
+ device_id,
+ measurements_list,
+ values_list,
+ times,
+ is_aligned,
+ )
+ return request
diff --git a/client-py/tests/tablet_performance_comparison.py
b/client-py/tests/tablet_performance_comparison.py
index 3626e818a8..ef5847140c 100644
--- a/client-py/tests/tablet_performance_comparison.py
+++ b/client-py/tests/tablet_performance_comparison.py
@@ -75,9 +75,9 @@ def generate_csv_data(
if _type == TSDataType.BOOLEAN:
return [random.randint(0, 1) == 1 for _ in range(_row)]
elif _type == TSDataType.INT32:
- return [random.randint(-(2**31), 2**31) for _ in range(_row)]
+ return [random.randint(-(2 ** 31), 2 ** 31) for _ in range(_row)]
elif _type == TSDataType.INT64:
- return [random.randint(-(2**63), 2**63) for _ in range(_row)]
+ return [random.randint(-(2 ** 63), 2 ** 63) for _ in range(_row)]
elif _type == TSDataType.FLOAT:
return [1.5 for _ in range(_row)]
elif _type == TSDataType.DOUBLE:
diff --git a/client-py/tests/test_one_device.py
b/client-py/tests/test_one_device.py
new file mode 100644
index 0000000000..c364cd1105
--- /dev/null
+++ b/client-py/tests/test_one_device.py
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Uncomment the following line to use apache-iotdb module installed by pip3
+
+from iotdb.Session import Session
+from iotdb.IoTDBContainer import IoTDBContainer
+
+# whether the test has passed
+final_flag = True
+failed_count = 0
+
+
+def test_fail():
+ global failed_count
+ global final_flag
+ final_flag = False
+ failed_count += 1
+
+
+def print_message(message):
+ print("*********")
+ print(message)
+ print("*********")
+
+
+def test_one_device():
+ with IoTDBContainer("iotdb:dev") as db:
+ db: IoTDBContainer
+ session = Session(db.get_container_host_ip(),
db.get_exposed_port(6667))
+ session.open(False)
+
+ if not session.is_open():
+ print("can't open session")
+ exit(1)
+
+ # insert string records of one device
+ time_list = [1, 2, 3]
+ measurements_list = [
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+ ]
+ values_list = [
+ ["False", "22", "33"],
+ ["True", "1", "23"],
+ ["False", "15", "26"],
+ ]
+
+ if (
+ session.insert_string_records_of_one_device(
+ "root.str_test_01.d_01",
+ time_list,
+ measurements_list,
+ values_list,
+ )
+ < 0
+ ):
+ test_fail()
+ print_message("insert string records of one device failed")
+
+ # insert aligned string record into the database.
+ time_list = [1, 2, 3]
+ measurements_list = [
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+ ["s_01", "s_02", "s_03"],
+ ]
+ values_list = [
+ ["False", "22", "33"],
+ ["True", "1", "23"],
+ ["False", "15", "26"],
+ ]
+
+ if (
+ session.insert_aligned_string_records_of_one_device(
+ "root.str_test_01.d_02",
+ time_list,
+ measurements_list,
+ values_list,
+ )
+ < 0
+ ):
+ test_fail()
+ print_message("insert aligned record of one device failed")
+
+ # execute raw data query sql statement
+ session_data_set = session.execute_raw_data_query(
+ ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 1, 4
+ )
+ session_data_set.set_fetch_size(1024)
+ expect_count = 3
+ actual_count = 0
+ while session_data_set.has_next():
+ print(session_data_set.next())
+ actual_count += 1
+ session_data_set.close_operation_handle()
+
+ if actual_count != expect_count:
+ test_fail()
+ print_message(
+ "query count mismatch: expect count: "
+ + str(expect_count)
+ + " actual count: "
+ + str(actual_count)
+ )
+ assert actual_count == expect_count
+
+ # execute last data query sql statement
+ session_data_set = session.execute_last_data_query(
+ ["root.str_test_01.d_02.s_01", "root.str_test_01.d_02.s_02"], 0
+ )
+ session_data_set.set_fetch_size(1024)
+ expect_time = 3
+ actual_time = session_data_set.next().get_timestamp()
+ session_data_set.close_operation_handle()
+
+ if actual_time != expect_time:
+ test_fail()
+ print_message(
+ "query count mismatch: expect count: "
+ + str(expect_time)
+ + " actual count: "
+ + str(actual_time)
+ )
+ assert actual_time == expect_time
+
+ # close session connection.
+ session.close()
+
+
+if final_flag:
+ print("All executions done!!")
+else:
+ print("Some test failed, please have a check")
+ print("failed count: ", failed_count)
+ exit(1)