This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch query_v3_py
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/query_v3_py by this push:
new 368fe6e1652 try CI
368fe6e1652 is described below
commit 368fe6e1652a4813be6d5b3bf5612b739ecbb74c
Author: HTHou <[email protected]>
AuthorDate: Tue Mar 25 10:20:51 2025 +0800
try CI
---
.github/workflows/multi-language-client.yml | 1 +
.../client-py/iotdb/tsfile/utils/tsblock_serde.py | 275 +--------------------
.../client-py/iotdb/utils/iotdb_rpc_dataset.py | 133 ++--------
3 files changed, 30 insertions(+), 379 deletions(-)
diff --git a/.github/workflows/multi-language-client.yml
b/.github/workflows/multi-language-client.yml
index 095f22557d6..94012b29279 100644
--- a/.github/workflows/multi-language-client.yml
+++ b/.github/workflows/multi-language-client.yml
@@ -4,6 +4,7 @@ on:
branches:
- master
- "rc/*"
+ - query_v3_py
paths:
- 'pom.xml'
- 'iotdb-client/pom.xml'
diff --git a/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py
b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py
index 881113f62e3..34cf02d205e 100644
--- a/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py
+++ b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py
@@ -15,246 +15,11 @@
# specific language governing permissions and limitations
# under the License.
#
-import struct
import numpy as np
-import pandas as pd
from iotdb.utils.IoTDBConstants import TSDataType
-TIMESTAMP_STR = "Time"
-START_INDEX = 2
-
-
-# convert dataFrame to tsBlock in binary
-# input shouldn't contain time column
-def convert_to_binary(data_frame: pd.DataFrame):
- data_shape = data_frame.shape
- value_column_size = data_shape[1]
- position_count = data_shape[0]
- keys = data_frame.keys()
-
- binary = value_column_size.to_bytes(4, byteorder="big")
-
- for data_type in data_frame.dtypes:
- binary += _get_type_in_byte(data_type)
-
- # position count
- binary += position_count.to_bytes(4, byteorder="big")
-
- # column encoding
- binary += b"\x02"
- for data_type in data_frame.dtypes:
- binary += _get_encoder(data_type)
-
- # write columns, the column in index 0 must be timeColumn
- binary += bool.to_bytes(False, 1, byteorder="big")
- for i in range(position_count):
- value = 0
- v = struct.pack(">i", value)
- binary += v
- binary += v
-
- for i in range(value_column_size):
- # the value can't be null
- binary += bool.to_bytes(False, 1, byteorder="big")
- col = data_frame[keys[i]]
- for j in range(position_count):
- value = col[j]
- if value.dtype.byteorder != ">":
- value = value.byteswap()
- binary += value.tobytes()
-
- return binary
-
-
-# convert tsBlock in binary to dataFrame
-def convert_to_df(name_list, type_list, name_index, binary_list):
- column_name_list = [TIMESTAMP_STR]
- column_type_list = [TSDataType.INT64]
- column_ordinal_dict = {TIMESTAMP_STR: 1}
-
- if name_index is not None:
- column_type_deduplicated_list = [None for _ in range(len(name_index))]
- for i in range(len(name_list)):
- name = name_list[i]
- column_name_list.append(name)
- column_type_list.append(TSDataType[type_list[i]])
- if name not in column_ordinal_dict:
- index = name_index[name]
- column_ordinal_dict[name] = index + START_INDEX
- column_type_deduplicated_list[index] = TSDataType[type_list[i]]
- else:
- index = START_INDEX
- column_type_deduplicated_list = []
- for i in range(len(name_list)):
- name = name_list[i]
- column_name_list.append(name)
- column_type_list.append(TSDataType[type_list[i]])
- if name not in column_ordinal_dict:
- column_ordinal_dict[name] = index
- index += 1
- column_type_deduplicated_list.append(TSDataType[type_list[i]])
-
- binary_size = len(binary_list)
- binary_index = 0
- result = {}
- for column_name in column_name_list:
- result[column_name] = None
-
- while binary_index < binary_size:
- buffer = binary_list[binary_index]
- binary_index += 1
- time_column_values, column_values, null_indicators, _ =
deserialize(buffer)
- time_array = np.frombuffer(
- time_column_values, np.dtype(np.longlong).newbyteorder(">")
- )
- if time_array.dtype.byteorder == ">":
- time_array = time_array.byteswap().newbyteorder("<")
-
- if result[TIMESTAMP_STR] is None:
- result[TIMESTAMP_STR] = time_array
- else:
- result[TIMESTAMP_STR] = np.concatenate(
- (result[TIMESTAMP_STR], time_array), axis=0
- )
- total_length = len(time_array)
-
- for i in range(len(column_values)):
- column_name = column_name_list[i + 1]
-
- location = column_ordinal_dict[column_name] - START_INDEX
- if location < 0:
- continue
-
- data_type = column_type_deduplicated_list[location]
- value_buffer = column_values[location]
- value_buffer_len = len(value_buffer)
-
- if data_type == TSDataType.DOUBLE:
- data_array = np.frombuffer(
- value_buffer, np.dtype(np.double).newbyteorder(">")
- )
- elif data_type == TSDataType.FLOAT:
- data_array = np.frombuffer(
- value_buffer, np.dtype(np.float32).newbyteorder(">")
- )
- elif data_type == TSDataType.BOOLEAN:
- data_array = []
- for index in range(len(value_buffer)):
- data_array.append(value_buffer[index])
- data_array = np.array(data_array).astype("bool")
- elif data_type == TSDataType.INT32:
- data_array = np.frombuffer(
- value_buffer, np.dtype(np.int32).newbyteorder(">")
- )
- elif data_type == TSDataType.INT64:
- data_array = np.frombuffer(
- value_buffer, np.dtype(np.int64).newbyteorder(">")
- )
- elif data_type == TSDataType.TEXT:
- index = 0
- data_array = []
- while index < value_buffer_len:
- value_bytes = value_buffer[index]
- value = value_bytes.decode("utf-8")
- data_array.append(value)
- index += 1
- data_array = np.array(data_array, dtype=object)
- else:
- raise RuntimeError("unsupported data type
{}.".format(data_type))
-
- if data_array.dtype.byteorder == ">":
- data_array = data_array.byteswap().newbyteorder("<")
-
- null_indicator = null_indicators[location]
- if len(data_array) < total_length or (
- data_type == TSDataType.BOOLEAN and null_indicator is not None
- ):
- if data_type == TSDataType.INT32 or data_type ==
TSDataType.INT64:
- tmp_array = np.full(total_length, np.nan, np.float32)
- elif data_type == TSDataType.FLOAT or data_type ==
TSDataType.DOUBLE:
- tmp_array = np.full(total_length, np.nan, data_array.dtype)
- elif data_type == TSDataType.BOOLEAN:
- tmp_array = np.full(total_length, np.nan, np.float32)
- elif data_type == TSDataType.TEXT:
- tmp_array = np.full(total_length, np.nan,
dtype=data_array.dtype)
- else:
- raise Exception("Unsupported dataType in deserialization")
-
- if null_indicator is not None:
- indexes = [not v for v in null_indicator]
- if data_type == TSDataType.BOOLEAN:
- tmp_array[indexes] = data_array[indexes]
- else:
- tmp_array[indexes] = data_array
-
- if data_type == TSDataType.INT32:
- tmp_array = pd.Series(tmp_array).astype("Int32")
- elif data_type == TSDataType.INT64:
- tmp_array = pd.Series(tmp_array).astype("Int64")
- elif data_type == TSDataType.BOOLEAN:
- tmp_array = pd.Series(tmp_array).astype("boolean")
-
- data_array = tmp_array
-
- if result[column_name] is None:
- result[column_name] = data_array
- else:
- if isinstance(result[column_name], pd.Series):
- if not isinstance(data_array, pd.Series):
- if data_type == TSDataType.INT32:
- data_array = pd.Series(data_array).astype("Int32")
- elif data_type == TSDataType.INT64:
- data_array = pd.Series(data_array).astype("Int64")
- elif data_type == TSDataType.BOOLEAN:
- data_array =
pd.Series(data_array).astype("boolean")
- else:
- raise RuntimeError("Series Error")
- result[column_name] =
result[column_name].append(data_array)
- else:
- result[column_name] = np.concatenate(
- (result[column_name], data_array), axis=0
- )
- for k, v in result.items():
- if v is None:
- result[k] = []
- df = pd.DataFrame(result)
- df = df.reset_index(drop=True)
- return df
-
-
-def _get_encoder(data_type: pd.Series):
- if data_type == "bool":
- return b"\x00"
- elif data_type == "int32" or data_type == "float32":
- return b"\x01"
- elif data_type == "int64" or data_type == "float64":
- return b"\x02"
- elif data_type == "texr":
- return b"\x03"
-
-
-def _get_type_in_byte(data_type: pd.Series):
- if data_type == "bool":
- return b"\x00"
- elif data_type == "int32":
- return b"\x01"
- elif data_type == "int64":
- return b"\x02"
- elif data_type == "float32":
- return b"\x03"
- elif data_type == "float64":
- return b"\x04"
- elif data_type == "text":
- return b"\x05"
- else:
- raise RuntimeError(
- "data_type",
- data_type,
- "data_type should be in ['bool', 'int32', 'int64', 'float32',
'float64', 'text']",
- )
-
# Serialized tsBlock:
#
+-------------+---------------+---------+------------+-----------+----------+
@@ -316,33 +81,6 @@ def read_column_types(buffer, value_column_count):
return data_types, new_buffer
-def get_data_type_byte_from_str(value):
- """
- Args:
- value (str): data type in ['bool', 'int32', 'int64', 'float32',
'float64', 'text']
- Returns:
- byte: corresponding data type in [b'\x00', b'\x01', b'\x02', b'\x03',
b'\x04', b'\x05']
- """
- if value not in ["bool", "int32", "int64", "float32", "float64", "text"]:
- raise RuntimeError(
- "data_type",
- value,
- "data_type should be in ['bool', 'int32', 'int64', 'float32',
'float64', 'text']",
- )
- if value == "bool":
- return TSDataType.BOOLEAN.value
- elif value == "int32":
- return TSDataType.INT32.value
- elif value == "int64":
- return TSDataType.INT64.value
- elif value == "float32":
- return TSDataType.FLOAT.value
- elif value == "float64":
- return TSDataType.DOUBLE.value
- elif value == "text":
- return TSDataType.TEXT.value
-
-
# Read ColumnEncodings
@@ -376,7 +114,7 @@ def read_int64_column(buffer, data_type, position_count):
if null_indicators is None:
size = position_count
else:
- size = null_indicators.count(False)
+ size = np.count_nonzero(~null_indicators)
if data_type == 2:
dtype = ">i8"
@@ -402,7 +140,7 @@ def read_int32_column(buffer, data_type, position_count):
if null_indicators is None:
size = position_count
else:
- size = null_indicators.count(False)
+ size = np.count_nonzero(~null_indicators)
if data_type == 1:
dtype = ">i4"
@@ -462,12 +200,12 @@ def read_binary_column(buffer, data_type, position_count):
if null_indicators is None:
size = position_count
else:
- size = null_indicators.count(False)
- values = [None] * size
+ size = np.count_nonzero(~null_indicators)
+ values = np.empty(size, dtype=object)
for i in range(size):
length, buffer = read_int_from_buffer(buffer)
res, buffer = read_from_buffer(buffer, length)
- values[i] = res
+ values[i] = res.tobytes()
return values, null_indicators, buffer
@@ -481,8 +219,7 @@ def read_binary_column(buffer, data_type, position_count):
def read_run_length_column(buffer, data_type, position_count):
encoding, buffer = read_byte_from_buffer(buffer)
- column, null_indicators, buffer = read_column(encoding, buffer, data_type,
1)
-
+ column, null_indicators, buffer = read_column(encoding[0], buffer,
data_type, 1)
return (
repeat(column, data_type, position_count),
None if null_indicators is None else null_indicators * position_count,
diff --git a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
index 1933667b77b..b67865e8be2 100644
--- a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
+++ b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
@@ -163,69 +163,34 @@ class IoTDBRpcDataSet(object):
has_pd_series.append(False)
total_length = 0
while self.__query_result_index < len(self.__query_result):
- time_column_values, column_values, null_indicators, current_length
= (
-
deserialize(memoryview(self.__query_result[self.__query_result_index]))
+ time_array, column_arrays, null_indicators, array_length =
deserialize(
+ memoryview(self.__query_result[self.__query_result_index])
)
self.__query_result[self.__query_result_index] = None
self.__query_result_index += 1
- time_array = time_column_values
- if time_array.dtype.byteorder == ">":
- time_array = time_array.byteswap().view(
- time_array.dtype.newbyteorder("<")
- )
if self.ignore_timestamp is None or self.ignore_timestamp is False:
result[0].append(time_array)
- total_length += current_length
+ total_length += array_length
for i, location in enumerate(
self.__column_index_2_tsblock_column_index_list
):
if location < 0:
continue
data_type = self.__data_type_for_tsblock_column[location]
- value_buffer = column_values[location]
- value_buffer_len = len(value_buffer)
- # DOUBLE
- if data_type == 4:
- data_array = value_buffer
- # FLOAT
- elif data_type == 3:
- data_array = value_buffer
- # BOOLEAN
- elif data_type == 0:
- data_array = np.array(value_buffer).astype("bool")
- # INT32, DATE
- elif data_type == 1 or data_type == 9:
- data_array = value_buffer
- # INT64, TIMESTAMP
- elif data_type == 2 or data_type == 8:
- data_array = value_buffer
- # TEXT, STRING, BLOB
- elif data_type == 5 or data_type == 11 or data_type == 10:
- index = 0
- data_array = []
- while index < value_buffer_len:
- data_array.append(value_buffer[index].tobytes())
- index += 1
- data_array = np.array(data_array, dtype=object)
- else:
- raise RuntimeError("unsupported data type
{}.".format(data_type))
- if data_array.dtype.byteorder == ">":
- data_array = data_array.byteswap().view(
- data_array.dtype.newbyteorder("<")
- )
+ column_array = column_arrays[location]
null_indicator = null_indicators[location]
- if len(data_array) < current_length or (
+ if len(column_array) < array_length or (
data_type == 0 and null_indicator is not None
):
- tmp_array = np.full(current_length, None, dtype=object)
+ tmp_array = np.full(array_length, None, dtype=object)
if null_indicator is not None:
indexes = [not v for v in null_indicator]
if data_type == 0:
- tmp_array[indexes] = data_array[indexes]
+ tmp_array[indexes] = column_array[indexes]
else:
- tmp_array[indexes] = data_array
+ tmp_array[indexes] = column_array
# INT32, DATE
if data_type == 1 or data_type == 9:
@@ -243,9 +208,9 @@ class IoTDBRpcDataSet(object):
elif data_type == 3 or data_type == 4:
tmp_array = pd.Series(tmp_array)
has_pd_series[i] = True
- data_array = tmp_array
+ column_array = tmp_array
- result[i].append(data_array)
+ result[i].append(column_array)
for k, v in result.items():
if v is None or len(v) < 1 or v[0] is None:
result[k] = []
@@ -281,87 +246,35 @@ class IoTDBRpcDataSet(object):
for i in range(len(self.__column_index_2_tsblock_column_index_list)):
result[i] = []
while self._has_next_result_set():
- time_column_values, column_values, null_indicators, _ =
deserialize(
- self.__query_result[self.__query_result_index]
+ time_array, column_arrays, null_indicators, array_length =
deserialize(
+ memoryview(self.__query_result[self.__query_result_index])
)
self.__query_result[self.__query_result_index] = None
self.__query_result_index += 1
- time_array = np.frombuffer(
- time_column_values, np.dtype(np.longlong).newbyteorder(">")
- )
- if time_array.dtype.byteorder == ">":
- time_array = time_array.byteswap().view(
- time_array.dtype.newbyteorder("<")
- )
if self.ignore_timestamp is None or self.ignore_timestamp is False:
result[0].append(time_array)
- total_length = len(time_array)
-
for i, location in enumerate(
self.__column_index_2_tsblock_column_index_list
):
if location < 0:
continue
data_type = self.__data_type_for_tsblock_column[location]
- value_buffer = column_values[location]
- value_buffer_len = len(value_buffer)
- # DOUBLE
- if data_type == 4:
- data_array = np.frombuffer(
- value_buffer, np.dtype(np.double).newbyteorder(">")
- )
- # FLOAT
- elif data_type == 3:
- data_array = np.frombuffer(
- value_buffer, np.dtype(np.float32).newbyteorder(">")
- )
- # BOOLEAN
- elif data_type == 0:
- data_array = np.array(value_buffer).astype("bool")
- # INT32
- elif data_type == 1:
- data_array = np.frombuffer(
- value_buffer, np.dtype(np.int32).newbyteorder(">")
- )
- # INT64, TIMESTAMP
- elif data_type == 2 or data_type == 8:
- data_array = np.frombuffer(
- value_buffer, np.dtype(np.int64).newbyteorder(">")
- )
+ column_array = column_arrays[location]
+ # BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TIMESTAMP, BLOB
+ if data_type in (0, 1, 2, 3, 4, 8, 10):
+ data_array = column_array
# TEXT, STRING
- elif data_type == 5 or data_type == 11:
- index = 0
- data_array = []
- while index < value_buffer_len:
- value_bytes = value_buffer[index].tobytes()
- value = value_bytes.decode("utf-8")
- data_array.append(value)
- index += 1
- data_array = pd.Series(data_array).astype(str)
- # BLOB
- elif data_type == 10:
- index = 0
- data_array = []
- while index < value_buffer_len:
- data_array.append(value_buffer[index].tobytes())
- index += 1
- data_array = pd.Series(data_array)
+ elif data_type in (5, 11):
+ data_array = np.array([x.decode("utf-8") for x in
column_array])
# DATE
elif data_type == 9:
- data_array = np.frombuffer(
- value_buffer, np.dtype(np.int32).newbyteorder(">")
- )
- data_array = pd.Series(data_array).apply(parse_int_to_date)
+ data_array =
pd.Series(column_array).apply(parse_int_to_date)
else:
raise RuntimeError("unsupported data type
{}.".format(data_type))
- if data_array.dtype.byteorder == ">" and len(data_array) > 0:
- data_array = data_array.byteswap().view(
- data_array.dtype.newbyteorder("<")
- )
tmp_array = []
null_indicator = null_indicators[location]
- if len(data_array) < total_length or (
+ if len(data_array) < array_length or (
data_type == 0 and null_indicator is not None
):
# BOOLEAN, INT32, INT64, TIMESTAMP
@@ -371,11 +284,11 @@ class IoTDBRpcDataSet(object):
or data_type == 2
or data_type == 8
):
- tmp_array = np.full(total_length, pd.NA, dtype=object)
+ tmp_array = np.full(array_length, pd.NA, dtype=object)
# FLOAT, DOUBLE
elif data_type == 3 or data_type == 4:
tmp_array = np.full(
- total_length, np.nan, dtype=data_array.dtype
+ array_length, np.nan, dtype=data_array.dtype
)
# TEXT, STRING, BLOB, DATE
elif (
@@ -384,7 +297,7 @@ class IoTDBRpcDataSet(object):
or data_type == 10
or data_type == 9
):
- tmp_array = np.full(total_length, None, dtype=object)
+ tmp_array = np.full(array_length, None, dtype=object)
if null_indicator is not None:
indexes = [not v for v in null_indicator]