This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch query_v3_py
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/query_v3_py by this push:
     new 368fe6e1652 try CI
368fe6e1652 is described below

commit 368fe6e1652a4813be6d5b3bf5612b739ecbb74c
Author: HTHou <[email protected]>
AuthorDate: Tue Mar 25 10:20:51 2025 +0800

    try CI
---
 .github/workflows/multi-language-client.yml        |   1 +
 .../client-py/iotdb/tsfile/utils/tsblock_serde.py  | 275 +--------------------
 .../client-py/iotdb/utils/iotdb_rpc_dataset.py     | 133 ++--------
 3 files changed, 30 insertions(+), 379 deletions(-)

diff --git a/.github/workflows/multi-language-client.yml 
b/.github/workflows/multi-language-client.yml
index 095f22557d6..94012b29279 100644
--- a/.github/workflows/multi-language-client.yml
+++ b/.github/workflows/multi-language-client.yml
@@ -4,6 +4,7 @@ on:
     branches:
       - master
       - "rc/*"
+      - query_v3_py
     paths:
       - 'pom.xml'
       - 'iotdb-client/pom.xml'
diff --git a/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py 
b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py
index 881113f62e3..34cf02d205e 100644
--- a/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py
+++ b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py
@@ -15,246 +15,11 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-import struct
 
 import numpy as np
-import pandas as pd
 
 from iotdb.utils.IoTDBConstants import TSDataType
 
-TIMESTAMP_STR = "Time"
-START_INDEX = 2
-
-
-# convert dataFrame to tsBlock in binary
-# input shouldn't contain time column
-def convert_to_binary(data_frame: pd.DataFrame):
-    data_shape = data_frame.shape
-    value_column_size = data_shape[1]
-    position_count = data_shape[0]
-    keys = data_frame.keys()
-
-    binary = value_column_size.to_bytes(4, byteorder="big")
-
-    for data_type in data_frame.dtypes:
-        binary += _get_type_in_byte(data_type)
-
-    # position count
-    binary += position_count.to_bytes(4, byteorder="big")
-
-    # column encoding
-    binary += b"\x02"
-    for data_type in data_frame.dtypes:
-        binary += _get_encoder(data_type)
-
-    # write columns, the column in index 0 must be timeColumn
-    binary += bool.to_bytes(False, 1, byteorder="big")
-    for i in range(position_count):
-        value = 0
-        v = struct.pack(">i", value)
-        binary += v
-        binary += v
-
-    for i in range(value_column_size):
-        # the value can't be null
-        binary += bool.to_bytes(False, 1, byteorder="big")
-        col = data_frame[keys[i]]
-        for j in range(position_count):
-            value = col[j]
-            if value.dtype.byteorder != ">":
-                value = value.byteswap()
-            binary += value.tobytes()
-
-    return binary
-
-
-# convert tsBlock in binary to dataFrame
-def convert_to_df(name_list, type_list, name_index, binary_list):
-    column_name_list = [TIMESTAMP_STR]
-    column_type_list = [TSDataType.INT64]
-    column_ordinal_dict = {TIMESTAMP_STR: 1}
-
-    if name_index is not None:
-        column_type_deduplicated_list = [None for _ in range(len(name_index))]
-        for i in range(len(name_list)):
-            name = name_list[i]
-            column_name_list.append(name)
-            column_type_list.append(TSDataType[type_list[i]])
-            if name not in column_ordinal_dict:
-                index = name_index[name]
-                column_ordinal_dict[name] = index + START_INDEX
-                column_type_deduplicated_list[index] = TSDataType[type_list[i]]
-    else:
-        index = START_INDEX
-        column_type_deduplicated_list = []
-        for i in range(len(name_list)):
-            name = name_list[i]
-            column_name_list.append(name)
-            column_type_list.append(TSDataType[type_list[i]])
-            if name not in column_ordinal_dict:
-                column_ordinal_dict[name] = index
-                index += 1
-                column_type_deduplicated_list.append(TSDataType[type_list[i]])
-
-    binary_size = len(binary_list)
-    binary_index = 0
-    result = {}
-    for column_name in column_name_list:
-        result[column_name] = None
-
-    while binary_index < binary_size:
-        buffer = binary_list[binary_index]
-        binary_index += 1
-        time_column_values, column_values, null_indicators, _ = 
deserialize(buffer)
-        time_array = np.frombuffer(
-            time_column_values, np.dtype(np.longlong).newbyteorder(">")
-        )
-        if time_array.dtype.byteorder == ">":
-            time_array = time_array.byteswap().newbyteorder("<")
-
-        if result[TIMESTAMP_STR] is None:
-            result[TIMESTAMP_STR] = time_array
-        else:
-            result[TIMESTAMP_STR] = np.concatenate(
-                (result[TIMESTAMP_STR], time_array), axis=0
-            )
-        total_length = len(time_array)
-
-        for i in range(len(column_values)):
-            column_name = column_name_list[i + 1]
-
-            location = column_ordinal_dict[column_name] - START_INDEX
-            if location < 0:
-                continue
-
-            data_type = column_type_deduplicated_list[location]
-            value_buffer = column_values[location]
-            value_buffer_len = len(value_buffer)
-
-            if data_type == TSDataType.DOUBLE:
-                data_array = np.frombuffer(
-                    value_buffer, np.dtype(np.double).newbyteorder(">")
-                )
-            elif data_type == TSDataType.FLOAT:
-                data_array = np.frombuffer(
-                    value_buffer, np.dtype(np.float32).newbyteorder(">")
-                )
-            elif data_type == TSDataType.BOOLEAN:
-                data_array = []
-                for index in range(len(value_buffer)):
-                    data_array.append(value_buffer[index])
-                data_array = np.array(data_array).astype("bool")
-            elif data_type == TSDataType.INT32:
-                data_array = np.frombuffer(
-                    value_buffer, np.dtype(np.int32).newbyteorder(">")
-                )
-            elif data_type == TSDataType.INT64:
-                data_array = np.frombuffer(
-                    value_buffer, np.dtype(np.int64).newbyteorder(">")
-                )
-            elif data_type == TSDataType.TEXT:
-                index = 0
-                data_array = []
-                while index < value_buffer_len:
-                    value_bytes = value_buffer[index]
-                    value = value_bytes.decode("utf-8")
-                    data_array.append(value)
-                    index += 1
-                data_array = np.array(data_array, dtype=object)
-            else:
-                raise RuntimeError("unsupported data type 
{}.".format(data_type))
-
-            if data_array.dtype.byteorder == ">":
-                data_array = data_array.byteswap().newbyteorder("<")
-
-            null_indicator = null_indicators[location]
-            if len(data_array) < total_length or (
-                data_type == TSDataType.BOOLEAN and null_indicator is not None
-            ):
-                if data_type == TSDataType.INT32 or data_type == 
TSDataType.INT64:
-                    tmp_array = np.full(total_length, np.nan, np.float32)
-                elif data_type == TSDataType.FLOAT or data_type == 
TSDataType.DOUBLE:
-                    tmp_array = np.full(total_length, np.nan, data_array.dtype)
-                elif data_type == TSDataType.BOOLEAN:
-                    tmp_array = np.full(total_length, np.nan, np.float32)
-                elif data_type == TSDataType.TEXT:
-                    tmp_array = np.full(total_length, np.nan, 
dtype=data_array.dtype)
-                else:
-                    raise Exception("Unsupported dataType in deserialization")
-
-                if null_indicator is not None:
-                    indexes = [not v for v in null_indicator]
-                    if data_type == TSDataType.BOOLEAN:
-                        tmp_array[indexes] = data_array[indexes]
-                    else:
-                        tmp_array[indexes] = data_array
-
-                if data_type == TSDataType.INT32:
-                    tmp_array = pd.Series(tmp_array).astype("Int32")
-                elif data_type == TSDataType.INT64:
-                    tmp_array = pd.Series(tmp_array).astype("Int64")
-                elif data_type == TSDataType.BOOLEAN:
-                    tmp_array = pd.Series(tmp_array).astype("boolean")
-
-                data_array = tmp_array
-
-            if result[column_name] is None:
-                result[column_name] = data_array
-            else:
-                if isinstance(result[column_name], pd.Series):
-                    if not isinstance(data_array, pd.Series):
-                        if data_type == TSDataType.INT32:
-                            data_array = pd.Series(data_array).astype("Int32")
-                        elif data_type == TSDataType.INT64:
-                            data_array = pd.Series(data_array).astype("Int64")
-                        elif data_type == TSDataType.BOOLEAN:
-                            data_array = 
pd.Series(data_array).astype("boolean")
-                        else:
-                            raise RuntimeError("Series Error")
-                    result[column_name] = 
result[column_name].append(data_array)
-                else:
-                    result[column_name] = np.concatenate(
-                        (result[column_name], data_array), axis=0
-                    )
-    for k, v in result.items():
-        if v is None:
-            result[k] = []
-    df = pd.DataFrame(result)
-    df = df.reset_index(drop=True)
-    return df
-
-
-def _get_encoder(data_type: pd.Series):
-    if data_type == "bool":
-        return b"\x00"
-    elif data_type == "int32" or data_type == "float32":
-        return b"\x01"
-    elif data_type == "int64" or data_type == "float64":
-        return b"\x02"
-    elif data_type == "texr":
-        return b"\x03"
-
-
-def _get_type_in_byte(data_type: pd.Series):
-    if data_type == "bool":
-        return b"\x00"
-    elif data_type == "int32":
-        return b"\x01"
-    elif data_type == "int64":
-        return b"\x02"
-    elif data_type == "float32":
-        return b"\x03"
-    elif data_type == "float64":
-        return b"\x04"
-    elif data_type == "text":
-        return b"\x05"
-    else:
-        raise RuntimeError(
-            "data_type",
-            data_type,
-            "data_type should be in ['bool', 'int32', 'int64', 'float32', 
'float64', 'text']",
-        )
-
 
 # Serialized tsBlock:
 #    
+-------------+---------------+---------+------------+-----------+----------+
@@ -316,33 +81,6 @@ def read_column_types(buffer, value_column_count):
     return data_types, new_buffer
 
 
-def get_data_type_byte_from_str(value):
-    """
-    Args:
-        value (str): data type in ['bool', 'int32', 'int64', 'float32', 
'float64', 'text']
-    Returns:
-        byte: corresponding data type in [b'\x00', b'\x01', b'\x02', b'\x03', 
b'\x04', b'\x05']
-    """
-    if value not in ["bool", "int32", "int64", "float32", "float64", "text"]:
-        raise RuntimeError(
-            "data_type",
-            value,
-            "data_type should be in ['bool', 'int32', 'int64', 'float32', 
'float64', 'text']",
-        )
-    if value == "bool":
-        return TSDataType.BOOLEAN.value
-    elif value == "int32":
-        return TSDataType.INT32.value
-    elif value == "int64":
-        return TSDataType.INT64.value
-    elif value == "float32":
-        return TSDataType.FLOAT.value
-    elif value == "float64":
-        return TSDataType.DOUBLE.value
-    elif value == "text":
-        return TSDataType.TEXT.value
-
-
 # Read ColumnEncodings
 
 
@@ -376,7 +114,7 @@ def read_int64_column(buffer, data_type, position_count):
     if null_indicators is None:
         size = position_count
     else:
-        size = null_indicators.count(False)
+        size = np.count_nonzero(~null_indicators)
 
     if data_type == 2:
         dtype = ">i8"
@@ -402,7 +140,7 @@ def read_int32_column(buffer, data_type, position_count):
     if null_indicators is None:
         size = position_count
     else:
-        size = null_indicators.count(False)
+        size = np.count_nonzero(~null_indicators)
 
     if data_type == 1:
         dtype = ">i4"
@@ -462,12 +200,12 @@ def read_binary_column(buffer, data_type, position_count):
     if null_indicators is None:
         size = position_count
     else:
-        size = null_indicators.count(False)
-    values = [None] * size
+        size = np.count_nonzero(~null_indicators)
+    values = np.empty(size, dtype=object)
     for i in range(size):
         length, buffer = read_int_from_buffer(buffer)
         res, buffer = read_from_buffer(buffer, length)
-        values[i] = res
+        values[i] = res.tobytes()
     return values, null_indicators, buffer
 
 
@@ -481,8 +219,7 @@ def read_binary_column(buffer, data_type, position_count):
 
 def read_run_length_column(buffer, data_type, position_count):
     encoding, buffer = read_byte_from_buffer(buffer)
-    column, null_indicators, buffer = read_column(encoding, buffer, data_type, 
1)
-
+    column, null_indicators, buffer = read_column(encoding[0], buffer, 
data_type, 1)
     return (
         repeat(column, data_type, position_count),
         None if null_indicators is None else null_indicators * position_count,
diff --git a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py 
b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
index 1933667b77b..b67865e8be2 100644
--- a/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
+++ b/iotdb-client/client-py/iotdb/utils/iotdb_rpc_dataset.py
@@ -163,69 +163,34 @@ class IoTDBRpcDataSet(object):
             has_pd_series.append(False)
         total_length = 0
         while self.__query_result_index < len(self.__query_result):
-            time_column_values, column_values, null_indicators, current_length 
= (
-                
deserialize(memoryview(self.__query_result[self.__query_result_index]))
+            time_array, column_arrays, null_indicators, array_length = 
deserialize(
+                memoryview(self.__query_result[self.__query_result_index])
             )
             self.__query_result[self.__query_result_index] = None
             self.__query_result_index += 1
-            time_array = time_column_values
-            if time_array.dtype.byteorder == ">":
-                time_array = time_array.byteswap().view(
-                    time_array.dtype.newbyteorder("<")
-                )
             if self.ignore_timestamp is None or self.ignore_timestamp is False:
                 result[0].append(time_array)
-            total_length += current_length
+            total_length += array_length
             for i, location in enumerate(
                 self.__column_index_2_tsblock_column_index_list
             ):
                 if location < 0:
                     continue
                 data_type = self.__data_type_for_tsblock_column[location]
-                value_buffer = column_values[location]
-                value_buffer_len = len(value_buffer)
-                # DOUBLE
-                if data_type == 4:
-                    data_array = value_buffer
-                # FLOAT
-                elif data_type == 3:
-                    data_array = value_buffer
-                # BOOLEAN
-                elif data_type == 0:
-                    data_array = np.array(value_buffer).astype("bool")
-                # INT32, DATE
-                elif data_type == 1 or data_type == 9:
-                    data_array = value_buffer
-                # INT64, TIMESTAMP
-                elif data_type == 2 or data_type == 8:
-                    data_array = value_buffer
-                # TEXT, STRING, BLOB
-                elif data_type == 5 or data_type == 11 or data_type == 10:
-                    index = 0
-                    data_array = []
-                    while index < value_buffer_len:
-                        data_array.append(value_buffer[index].tobytes())
-                        index += 1
-                    data_array = np.array(data_array, dtype=object)
-                else:
-                    raise RuntimeError("unsupported data type 
{}.".format(data_type))
-                if data_array.dtype.byteorder == ">":
-                    data_array = data_array.byteswap().view(
-                        data_array.dtype.newbyteorder("<")
-                    )
 
+                column_array = column_arrays[location]
                 null_indicator = null_indicators[location]
 
-                if len(data_array) < current_length or (
+                if len(column_array) < array_length or (
                     data_type == 0 and null_indicator is not None
                 ):
-                    tmp_array = np.full(current_length, None, dtype=object)
+                    tmp_array = np.full(array_length, None, dtype=object)
                     if null_indicator is not None:
                         indexes = [not v for v in null_indicator]
                         if data_type == 0:
-                            tmp_array[indexes] = data_array[indexes]
+                            tmp_array[indexes] = column_array[indexes]
                         else:
-                            tmp_array[indexes] = data_array
+                            tmp_array[indexes] = column_array
 
                     # INT32, DATE
                     if data_type == 1 or data_type == 9:
@@ -243,9 +208,9 @@ class IoTDBRpcDataSet(object):
                     elif data_type == 3 or data_type == 4:
                         tmp_array = pd.Series(tmp_array)
                         has_pd_series[i] = True
-                    data_array = tmp_array
+                    column_array = tmp_array
 
-                result[i].append(data_array)
+                result[i].append(column_array)
         for k, v in result.items():
             if v is None or len(v) < 1 or v[0] is None:
                 result[k] = []
@@ -281,87 +246,35 @@ class IoTDBRpcDataSet(object):
         for i in range(len(self.__column_index_2_tsblock_column_index_list)):
             result[i] = []
         while self._has_next_result_set():
-            time_column_values, column_values, null_indicators, _ = 
deserialize(
-                self.__query_result[self.__query_result_index]
+            time_array, column_arrays, null_indicators, array_length = 
deserialize(
+                memoryview(self.__query_result[self.__query_result_index])
             )
             self.__query_result[self.__query_result_index] = None
             self.__query_result_index += 1
-            time_array = np.frombuffer(
-                time_column_values, np.dtype(np.longlong).newbyteorder(">")
-            )
-            if time_array.dtype.byteorder == ">":
-                time_array = time_array.byteswap().view(
-                    time_array.dtype.newbyteorder("<")
-                )
             if self.ignore_timestamp is None or self.ignore_timestamp is False:
                 result[0].append(time_array)
 
-            total_length = len(time_array)
-
             for i, location in enumerate(
                 self.__column_index_2_tsblock_column_index_list
             ):
                 if location < 0:
                     continue
                 data_type = self.__data_type_for_tsblock_column[location]
-                value_buffer = column_values[location]
-                value_buffer_len = len(value_buffer)
-                # DOUBLE
-                if data_type == 4:
-                    data_array = np.frombuffer(
-                        value_buffer, np.dtype(np.double).newbyteorder(">")
-                    )
-                # FLOAT
-                elif data_type == 3:
-                    data_array = np.frombuffer(
-                        value_buffer, np.dtype(np.float32).newbyteorder(">")
-                    )
-                # BOOLEAN
-                elif data_type == 0:
-                    data_array = np.array(value_buffer).astype("bool")
-                # INT32
-                elif data_type == 1:
-                    data_array = np.frombuffer(
-                        value_buffer, np.dtype(np.int32).newbyteorder(">")
-                    )
-                # INT64, TIMESTAMP
-                elif data_type == 2 or data_type == 8:
-                    data_array = np.frombuffer(
-                        value_buffer, np.dtype(np.int64).newbyteorder(">")
-                    )
+                column_array = column_arrays[location]
+                # BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TIMESTAMP, BLOB
+                if data_type in (0, 1, 2, 3, 4, 8, 10):
+                    data_array = column_array
                 # TEXT, STRING
-                elif data_type == 5 or data_type == 11:
-                    index = 0
-                    data_array = []
-                    while index < value_buffer_len:
-                        value_bytes = value_buffer[index].tobytes()
-                        value = value_bytes.decode("utf-8")
-                        data_array.append(value)
-                        index += 1
-                    data_array = pd.Series(data_array).astype(str)
-                # BLOB
-                elif data_type == 10:
-                    index = 0
-                    data_array = []
-                    while index < value_buffer_len:
-                        data_array.append(value_buffer[index].tobytes())
-                        index += 1
-                    data_array = pd.Series(data_array)
+                elif data_type in (5, 11):
+                    data_array = np.array([x.decode("utf-8") for x in 
column_array])
                 # DATE
                 elif data_type == 9:
-                    data_array = np.frombuffer(
-                        value_buffer, np.dtype(np.int32).newbyteorder(">")
-                    )
-                    data_array = pd.Series(data_array).apply(parse_int_to_date)
+                    data_array = 
pd.Series(column_array).apply(parse_int_to_date)
                 else:
                     raise RuntimeError("unsupported data type 
{}.".format(data_type))
-                if data_array.dtype.byteorder == ">" and len(data_array) > 0:
-                    data_array = data_array.byteswap().view(
-                        data_array.dtype.newbyteorder("<")
-                    )
                 tmp_array = []
                 null_indicator = null_indicators[location]
-                if len(data_array) < total_length or (
+                if len(data_array) < array_length or (
                     data_type == 0 and null_indicator is not None
                 ):
                     # BOOLEAN, INT32, INT64, TIMESTAMP
@@ -371,11 +284,11 @@ class IoTDBRpcDataSet(object):
                         or data_type == 2
                         or data_type == 8
                     ):
-                        tmp_array = np.full(total_length, pd.NA, dtype=object)
+                        tmp_array = np.full(array_length, pd.NA, dtype=object)
                     # FLOAT, DOUBLE
                     elif data_type == 3 or data_type == 4:
                         tmp_array = np.full(
-                            total_length, np.nan, dtype=data_array.dtype
+                            array_length, np.nan, dtype=data_array.dtype
                         )
                     # TEXT, STRING, BLOB, DATE
                     elif (
@@ -384,7 +297,7 @@ class IoTDBRpcDataSet(object):
                         or data_type == 10
                         or data_type == 9
                     ):
-                        tmp_array = np.full(total_length, None, dtype=object)
+                        tmp_array = np.full(array_length, None, dtype=object)
 
                     if null_indicator is not None:
                         indexes = [not v for v in null_indicator]

Reply via email to