This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a7fbca63206 [Py-client] Support insert_records with None values
(#14272)
a7fbca63206 is described below
commit a7fbca6320631a2724daf032bf8127f9a72530ab
Author: Haonan <[email protected]>
AuthorDate: Tue Dec 3 14:37:36 2024 +0800
[Py-client] Support insert_records with None values (#14272)
* Support insert_records with None values
* Support insert_records with None values
* Fix bug
---
iotdb-client/client-py/iotdb/Session.py | 115 +++++++++++++++++++++
.../client-py/tests/integration/test_session.py | 100 ++++++++++++++----
2 files changed, 196 insertions(+), 19 deletions(-)
diff --git a/iotdb-client/client-py/iotdb/Session.py
b/iotdb-client/client-py/iotdb/Session.py
index 9926eb1273a..bad50642820 100644
--- a/iotdb-client/client-py/iotdb/Session.py
+++ b/iotdb-client/client-py/iotdb/Session.py
@@ -510,6 +510,15 @@ class Session(object):
string_values = [string_values]
if type(measurements) == str:
measurements = [measurements]
+ if self.__has_none_value(string_values):
+ filtered_measurements, filtered_values = zip(
+ *[(m, v) for m, v in zip(measurements, string_values) if v is
not None]
+ )
+ measurements = list(filtered_measurements)
+ values = list(filtered_values)
+ if len(measurements) is 0 or len(values) is 0:
+ logger.info("All inserting values are none!")
+ return
request = self.gen_insert_str_record_req(
device_id, timestamp, measurements, string_values
)
@@ -541,6 +550,15 @@ class Session(object):
string_values = [string_values]
if type(measurements) == str:
measurements = [measurements]
+ if self.__has_none_value(string_values):
+ filtered_measurements, filtered_values = zip(
+ *[(m, v) for m, v in zip(measurements, string_values) if v is
not None]
+ )
+ measurements = list(filtered_measurements)
+ values = list(filtered_values)
+ if len(measurements) is 0 or len(values) is 0:
+ logger.info("All inserting values are none!")
+ return
request = self.gen_insert_str_record_req(
device_id, timestamp, measurements, string_values, True
)
@@ -576,6 +594,20 @@ class Session(object):
:param data_types: List of TSDataType, indicate the data type for each
sensor
:param values: List, values to be inserted, for each sensor
"""
+ if self.__has_none_value(values):
+ filtered_measurements, filtered_data_types, filtered_values = zip(
+ *[
+ (m, d, v)
+ for m, d, v in zip(measurements, data_types, values)
+ if v is not None
+ ]
+ )
+ measurements = list(filtered_measurements)
+ data_types = list(filtered_data_types)
+ values = list(filtered_values)
+ if len(measurements) is 0 or len(data_types) is 0 or len(values)
is 0:
+ logger.info("All inserting values are none!")
+ return
request = self.gen_insert_record_req(
device_id, timestamp, measurements, data_types, values
)
@@ -609,6 +641,19 @@ class Session(object):
:param types_lst: 2-D List of TSDataType, each element of outer list
indicates sensor data types of a device
:param values_lst: 2-D List, values to be inserted, for each device
"""
+ if self.__has_none_value(values_lst):
+ (
+ device_ids,
+ times,
+ measurements_lst,
+ types_lst,
+ values_lst,
+ ) = self.__filter_lists_by_values(
+ device_ids, times, measurements_lst, types_lst, values_lst
+ )
+ if len(device_ids) is 0:
+ logger.info("All inserting values are none!")
+ return
if self.__enable_redirection:
request_group = {}
for i in range(len(device_ids)):
@@ -678,6 +723,20 @@ class Session(object):
:param data_types: List of TSDataType, indicate the data type for each
sensor
:param values: List, values to be inserted, for each sensor
"""
+ if self.__has_none_value(values):
+ filtered_measurements, filtered_data_types, filtered_values = zip(
+ *[
+ (m, d, v)
+ for m, d, v in zip(measurements, data_types, values)
+ if v is not None
+ ]
+ )
+ measurements = list(filtered_measurements)
+ data_types = list(filtered_data_types)
+ values = list(filtered_values)
+ if len(measurements) is 0 or len(data_types) is 0 or len(values)
is 0:
+ logger.info("All inserting values are none!")
+ return
request = self.gen_insert_record_req(
device_id, timestamp, measurements, data_types, values, True
)
@@ -711,6 +770,19 @@ class Session(object):
:param types_lst: 2-D List of TSDataType, each element of outer list
indicates sensor data types of a device
:param values_lst: 2-D List, values to be inserted, for each device
"""
+ if self.__has_none_value(values_lst):
+ (
+ device_ids,
+ times,
+ measurements_lst,
+ types_lst,
+ values_lst,
+ ) = self.__filter_lists_by_values(
+ device_ids, times, measurements_lst, types_lst, values_lst
+ )
+ if len(device_ids) is 0:
+ logger.info("All inserting values are none!")
+ return
if self.__enable_redirection:
request_group = {}
for i in range(len(device_ids)):
@@ -1915,6 +1987,49 @@ class Session(object):
)
return request
+ def __has_none_value(self, values_list) -> bool:
+ for item in values_list:
+ if isinstance(item, list):
+ if self.__has_none_value(item):
+ return True
+ elif item is None:
+ return True
+ return False
+
+ @staticmethod
+ def __filter_lists_by_values(
+ device_lst, time_lst, measurements_lst, types_lst, values_lst
+ ):
+ filtered_devices = []
+ filtered_times = []
+ filtered_measurements = []
+ filtered_types = []
+ filtered_values = []
+
+ for device, time_, measurements, types, values in zip(
+ device_lst, time_lst, measurements_lst, types_lst, values_lst
+ ):
+ filtered_row = [
+ (m, t, v)
+ for m, t, v in zip(measurements, types, values)
+ if v is not None
+ ]
+ if filtered_row:
+ f_measurements, f_types, f_values = zip(*filtered_row)
+ filtered_measurements.append(list(f_measurements))
+ filtered_types.append(list(f_types))
+ filtered_values.append(list(f_values))
+ filtered_devices.append(device)
+ filtered_times.append(time_)
+
+ return (
+ filtered_devices,
+ filtered_times,
+ filtered_measurements,
+ filtered_types,
+ filtered_values,
+ )
+
def create_schema_template(self, template: Template):
warnings.warn(
"The APIs about template are deprecated and will be removed in
future versions. Use sql instead.",
diff --git a/iotdb-client/client-py/tests/integration/test_session.py
b/iotdb-client/client-py/tests/integration/test_session.py
index 7695f4133c2..75cdf056113 100644
--- a/iotdb-client/client-py/tests/integration/test_session.py
+++ b/iotdb-client/client-py/tests/integration/test_session.py
@@ -85,11 +85,15 @@ def session_test(use_session_pool=False):
session.set_storage_group("root.sg_test_03")
session.set_storage_group("root.sg_test_04")
- if session.delete_storage_group("root.sg_test_02") < 0:
+ try:
+ session.delete_storage_group("root.sg_test_02")
+ except Exception:
test_fail()
print_message("delete database failed")
- if session.delete_storage_groups(["root.sg_test_03",
"root.sg_test_04"]) < 0:
+ try:
+ session.delete_storage_groups(["root.sg_test_03",
"root.sg_test_04"])
+ except Exception:
test_fail()
print_message("delete databases failed")
@@ -177,7 +181,7 @@ def session_test(use_session_pool=False):
)
# delete time series
- if (
+ try:
session.delete_time_series(
[
"root.sg_test_01.d_01.s_07",
@@ -185,8 +189,7 @@ def session_test(use_session_pool=False):
"root.sg_test_01.d_01.s_09",
]
)
- < 0
- ):
+ except Exception:
test_fail()
print_message("delete time series failed")
@@ -220,12 +223,30 @@ def session_test(use_session_pool=False):
TSDataType.DOUBLE,
TSDataType.TEXT,
]
- if (
+ try:
session.insert_record(
"root.sg_test_01.d_01", 1, measurements_, data_types_, values_
)
- < 0
- ):
+ except Exception:
+ test_fail()
+ print_message("insert record failed")
+
+ # insert one record with none into the database.
+ measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
+ values_ = [False, 10, 11, None, None, None]
+ data_types_ = [
+ TSDataType.BOOLEAN,
+ TSDataType.INT32,
+ TSDataType.INT64,
+ TSDataType.FLOAT,
+ TSDataType.DOUBLE,
+ TSDataType.TEXT,
+ ]
+ try:
+ session.insert_record(
+ "root.sg_test_01.d_01", 1, measurements_, data_types_, values_
+ )
+ except Exception:
test_fail()
print_message("insert record failed")
@@ -240,12 +261,44 @@ def session_test(use_session_pool=False):
]
data_type_list_ = [data_types_, data_types_]
device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_02"]
- if (
+ try:
session.insert_records(
device_ids_, [2, 3], measurements_list_, data_type_list_,
values_list_
)
- < 0
- ):
+ except Exception:
+ test_fail()
+ print_message("insert records failed")
+
+ # insert multiple records with none into database
+ measurements_list_ = [
+ ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+ ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
+ ]
+ values_list_ = [
+ [False, 22, 33, 4.4, 55.1, "test_records01"],
+ [None, None, None, None, 8.125, bytes("test_records02", "utf-8")],
+ ]
+ data_type_list_ = [data_types_, data_types_]
+ device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_02"]
+ try:
+ session.insert_records(
+ device_ids_, [2, 3], measurements_list_, data_type_list_,
values_list_
+ )
+ except Exception:
+ test_fail()
+ print_message("insert records failed")
+
+ values_list_ = [
+ [False, 22, 33, 4.4, 55.1, "test_records01"],
+ [None, None, None, None, None, None],
+ ]
+ data_type_list_ = [data_types_, data_types_]
+ device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_02"]
+ try:
+ session.insert_records(
+ device_ids_, [2, 3], measurements_list_, data_type_list_,
values_list_
+ )
+ except Exception:
test_fail()
print_message("insert records failed")
@@ -261,7 +314,9 @@ def session_test(use_session_pool=False):
"root.sg_test_01.d_01", measurements_, data_types_, values_,
timestamps_
)
- if session.insert_tablet(tablet_) < 0:
+ try:
+ session.insert_tablet(tablet_)
+ except Exception:
test_fail()
print_message("insert tablet failed")
@@ -282,7 +337,9 @@ def session_test(use_session_pool=False):
np_values_,
np_timestamps_,
)
- if session.insert_tablet(np_tablet_) < 0:
+ try:
+ session.insert_tablet(np_tablet_)
+ except Exception:
test_fail()
print_message("insert numpy tablet failed")
@@ -297,7 +354,9 @@ def session_test(use_session_pool=False):
values_,
[12, 13, 14, 15],
)
- if session.insert_tablets([tablet_01, tablet_02]) < 0:
+ try:
+ session.insert_tablets([tablet_01, tablet_02])
+ except Exception:
test_fail()
print_message("insert tablets failed")
@@ -312,7 +371,9 @@ def session_test(use_session_pool=False):
tablet_ = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_,
timestamps_
)
- if session.insert_tablet(tablet_) < 0:
+ try:
+ session.insert_tablet(tablet_)
+ except Exception:
test_fail()
print_message("insert tablet with empty cells failed")
@@ -342,7 +403,9 @@ def session_test(use_session_pool=False):
np_timestamps_,
np_bitmaps_,
)
- if session.insert_tablet(np_tablet_) < 0:
+ try:
+ session.insert_tablet(np_tablet_)
+ except Exception:
test_fail()
print_message("insert numpy tablet with empty cells failed")
@@ -360,7 +423,7 @@ def session_test(use_session_pool=False):
]
values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]]
- if (
+ try:
session.insert_records_of_one_device(
"root.sg_test_01.d_01",
time_list,
@@ -368,8 +431,7 @@ def session_test(use_session_pool=False):
data_types_list,
values_list,
)
- < 0
- ):
+ except Exception:
test_fail()
print_message("insert records of one device failed")