This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch query_v2_py
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 811c575b66d331ccd1d3ba413228f01d00b87858
Author: HTHou <[email protected]>
AuthorDate: Wed Mar 19 18:06:40 2025 +0800

    init
---
 .../client-py/iotdb/tsfile/utils/tsblock_serde.py  | 552 +++++++++++++++++++++
 1 file changed, 552 insertions(+)

diff --git a/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py 
b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py
new file mode 100644
index 00000000000..149e62acd65
--- /dev/null
+++ b/iotdb-client/client-py/iotdb/tsfile/utils/tsblock_serde.py
@@ -0,0 +1,552 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import struct
+
+import numpy as np
+import pandas as pd
+
+from iotdb.utils.IoTDBConstants import TSDataType
+
+TIMESTAMP_STR = "Time"
+START_INDEX = 2
+
+
+# convert dataFrame to tsBlock in binary
+# input shouldn't contain time column
+def convert_to_binary(data_frame: pd.DataFrame):
+    data_shape = data_frame.shape
+    value_column_size = data_shape[1]
+    position_count = data_shape[0]
+    keys = data_frame.keys()
+
+    binary = value_column_size.to_bytes(4, byteorder="big")
+
+    for data_type in data_frame.dtypes:
+        binary += _get_type_in_byte(data_type)
+
+    # position count
+    binary += position_count.to_bytes(4, byteorder="big")
+
+    # column encoding
+    binary += b"\x02"
+    for data_type in data_frame.dtypes:
+        binary += _get_encoder(data_type)
+
+    # write columns, the column in index 0 must be timeColumn
+    binary += bool.to_bytes(False, 1, byteorder="big")
+    for i in range(position_count):
+        value = 0
+        v = struct.pack(">i", value)
+        binary += v
+        binary += v
+
+    for i in range(value_column_size):
+        # the value can't be null
+        binary += bool.to_bytes(False, 1, byteorder="big")
+        col = data_frame[keys[i]]
+        for j in range(position_count):
+            value = col[j]
+            if value.dtype.byteorder != ">":
+                value = value.byteswap()
+            binary += value.tobytes()
+
+    return binary
+
+
+# convert tsBlock in binary to dataFrame
+def convert_to_df(name_list, type_list, name_index, binary_list):
+    column_name_list = [TIMESTAMP_STR]
+    column_type_list = [TSDataType.INT64]
+    column_ordinal_dict = {TIMESTAMP_STR: 1}
+
+    if name_index is not None:
+        column_type_deduplicated_list = [None for _ in range(len(name_index))]
+        for i in range(len(name_list)):
+            name = name_list[i]
+            column_name_list.append(name)
+            column_type_list.append(TSDataType[type_list[i]])
+            if name not in column_ordinal_dict:
+                index = name_index[name]
+                column_ordinal_dict[name] = index + START_INDEX
+                column_type_deduplicated_list[index] = TSDataType[type_list[i]]
+    else:
+        index = START_INDEX
+        column_type_deduplicated_list = []
+        for i in range(len(name_list)):
+            name = name_list[i]
+            column_name_list.append(name)
+            column_type_list.append(TSDataType[type_list[i]])
+            if name not in column_ordinal_dict:
+                column_ordinal_dict[name] = index
+                index += 1
+                column_type_deduplicated_list.append(TSDataType[type_list[i]])
+
+    binary_size = len(binary_list)
+    binary_index = 0
+    result = {}
+    for column_name in column_name_list:
+        result[column_name] = None
+
+    while binary_index < binary_size:
+        buffer = binary_list[binary_index]
+        binary_index += 1
+        time_column_values, column_values, null_indicators, _ = 
deserialize(buffer)
+        time_array = np.frombuffer(
+            time_column_values, np.dtype(np.longlong).newbyteorder(">")
+        )
+        if time_array.dtype.byteorder == ">":
+            time_array = time_array.byteswap().newbyteorder("<")
+
+        if result[TIMESTAMP_STR] is None:
+            result[TIMESTAMP_STR] = time_array
+        else:
+            result[TIMESTAMP_STR] = np.concatenate(
+                (result[TIMESTAMP_STR], time_array), axis=0
+            )
+        total_length = len(time_array)
+
+        for i in range(len(column_values)):
+            column_name = column_name_list[i + 1]
+
+            location = column_ordinal_dict[column_name] - START_INDEX
+            if location < 0:
+                continue
+
+            data_type = column_type_deduplicated_list[location]
+            value_buffer = column_values[location]
+            value_buffer_len = len(value_buffer)
+
+            if data_type == TSDataType.DOUBLE:
+                data_array = np.frombuffer(
+                    value_buffer, np.dtype(np.double).newbyteorder(">")
+                )
+            elif data_type == TSDataType.FLOAT:
+                data_array = np.frombuffer(
+                    value_buffer, np.dtype(np.float32).newbyteorder(">")
+                )
+            elif data_type == TSDataType.BOOLEAN:
+                data_array = []
+                for index in range(len(value_buffer)):
+                    data_array.append(value_buffer[index])
+                data_array = np.array(data_array).astype("bool")
+            elif data_type == TSDataType.INT32:
+                data_array = np.frombuffer(
+                    value_buffer, np.dtype(np.int32).newbyteorder(">")
+                )
+            elif data_type == TSDataType.INT64:
+                data_array = np.frombuffer(
+                    value_buffer, np.dtype(np.int64).newbyteorder(">")
+                )
+            elif data_type == TSDataType.TEXT:
+                index = 0
+                data_array = []
+                while index < value_buffer_len:
+                    value_bytes = value_buffer[index]
+                    value = value_bytes.decode("utf-8")
+                    data_array.append(value)
+                    index += 1
+                data_array = np.array(data_array, dtype=object)
+            else:
+                raise RuntimeError("unsupported data type 
{}.".format(data_type))
+
+            if data_array.dtype.byteorder == ">":
+                data_array = data_array.byteswap().newbyteorder("<")
+
+            null_indicator = null_indicators[location]
+            if len(data_array) < total_length or (
+                data_type == TSDataType.BOOLEAN and null_indicator is not None
+            ):
+                if data_type == TSDataType.INT32 or data_type == 
TSDataType.INT64:
+                    tmp_array = np.full(total_length, np.nan, np.float32)
+                elif data_type == TSDataType.FLOAT or data_type == 
TSDataType.DOUBLE:
+                    tmp_array = np.full(total_length, np.nan, data_array.dtype)
+                elif data_type == TSDataType.BOOLEAN:
+                    tmp_array = np.full(total_length, np.nan, np.float32)
+                elif data_type == TSDataType.TEXT:
+                    tmp_array = np.full(total_length, np.nan, 
dtype=data_array.dtype)
+                else:
+                    raise Exception("Unsupported dataType in deserialization")
+
+                if null_indicator is not None:
+                    indexes = [not v for v in null_indicator]
+                    if data_type == TSDataType.BOOLEAN:
+                        tmp_array[indexes] = data_array[indexes]
+                    else:
+                        tmp_array[indexes] = data_array
+
+                if data_type == TSDataType.INT32:
+                    tmp_array = pd.Series(tmp_array).astype("Int32")
+                elif data_type == TSDataType.INT64:
+                    tmp_array = pd.Series(tmp_array).astype("Int64")
+                elif data_type == TSDataType.BOOLEAN:
+                    tmp_array = pd.Series(tmp_array).astype("boolean")
+
+                data_array = tmp_array
+
+            if result[column_name] is None:
+                result[column_name] = data_array
+            else:
+                if isinstance(result[column_name], pd.Series):
+                    if not isinstance(data_array, pd.Series):
+                        if data_type == TSDataType.INT32:
+                            data_array = pd.Series(data_array).astype("Int32")
+                        elif data_type == TSDataType.INT64:
+                            data_array = pd.Series(data_array).astype("Int64")
+                        elif data_type == TSDataType.BOOLEAN:
+                            data_array = 
pd.Series(data_array).astype("boolean")
+                        else:
+                            raise RuntimeError("Series Error")
+                    result[column_name] = 
result[column_name].append(data_array)
+                else:
+                    result[column_name] = np.concatenate(
+                        (result[column_name], data_array), axis=0
+                    )
+    for k, v in result.items():
+        if v is None:
+            result[k] = []
+    df = pd.DataFrame(result)
+    df = df.reset_index(drop=True)
+    return df
+
+
+def _get_encoder(data_type: pd.Series):
+    if data_type == "bool":
+        return b"\x00"
+    elif data_type == "int32" or data_type == "float32":
+        return b"\x01"
+    elif data_type == "int64" or data_type == "float64":
+        return b"\x02"
+    elif data_type == "texr":
+        return b"\x03"
+
+
+def _get_type_in_byte(data_type: pd.Series):
+    if data_type == "bool":
+        return b"\x00"
+    elif data_type == "int32":
+        return b"\x01"
+    elif data_type == "int64":
+        return b"\x02"
+    elif data_type == "float32":
+        return b"\x03"
+    elif data_type == "float64":
+        return b"\x04"
+    elif data_type == "text":
+        return b"\x05"
+    else:
+        raise BadConfigValueError(
+            "data_type",
+            data_type,
+            "data_type should be in ['bool', 'int32', 'int64', 'float32', 
'float64', 'text']",
+        )
+
+
+# Serialized tsBlock:
+#    
+-------------+---------------+---------+------------+-----------+----------+
+#    | val col cnt | val col types | pos cnt | encodings  | time col  | val 
col  |
+#    
+-------------+---------------+---------+------------+-----------+----------+
+#    | int32       | list[byte]    | int32   | list[byte] |  bytes    | byte   
  |
+#    
+-------------+---------------+---------+------------+-----------+----------+
+
+
+def deserialize(buffer):
+    value_column_count, buffer = read_int_from_buffer(buffer)
+    data_types, buffer = read_column_types(buffer, value_column_count)
+
+    position_count, buffer = read_int_from_buffer(buffer)
+    column_encodings, buffer = read_column_encoding(buffer, value_column_count 
+ 1)
+
+    time_column_values, buffer = read_time_column(buffer, position_count)
+    column_values = [None] * value_column_count
+    null_indicators = [None] * value_column_count
+    for i in range(value_column_count):
+        column_value, null_indicator, buffer = read_column(
+            column_encodings[i + 1], buffer, data_types[i], position_count
+        )
+        column_values[i] = column_value
+        null_indicators[i] = null_indicator
+
+    return time_column_values, column_values, null_indicators, position_count
+
+
+# General Methods
+
+
+def read_int_from_buffer(buffer):
+    res, buffer = read_from_buffer(buffer, 4)
+    return int.from_bytes(res, "big"), buffer
+
+
+def read_byte_from_buffer(buffer):
+    return read_from_buffer(buffer, 1)
+
+
+def read_from_buffer(buffer, size):
+    res = buffer[:size]
+    buffer = buffer[size:]
+    return res, buffer
+
+
+# Read ColumnType
+
+
+def read_column_types(buffer, value_column_count):
+    data_types = []
+    for _ in range(value_column_count):
+        res, buffer = read_byte_from_buffer(buffer)
+        data_types.append(get_data_type(res))
+    return data_types, buffer
+
+
+def get_data_type(value):
+    if value == b"\x00":
+        return TSDataType.BOOLEAN
+    elif value == b"\x01":
+        return TSDataType.INT32
+    elif value == b"\x02":
+        return TSDataType.INT64
+    elif value == b"\x03":
+        return TSDataType.FLOAT
+    elif value == b"\x04":
+        return TSDataType.DOUBLE
+    elif value == b"\x05":
+        return TSDataType.TEXT
+
+
+def get_data_type_byte_from_str(value):
+    """
+    Args:
+        value (str): data type in ['bool', 'int32', 'int64', 'float32', 
'float64', 'text']
+    Returns:
+        byte: corresponding data type in [b'\x00', b'\x01', b'\x02', b'\x03', 
b'\x04', b'\x05']
+    """
+    if value not in ["bool", "int32", "int64", "float32", "float64", "text"]:
+        raise BadConfigValueError(
+            "data_type",
+            value,
+            "data_type should be in ['bool', 'int32', 'int64', 'float32', 
'float64', 'text']",
+        )
+    if value == "bool":
+        return TSDataType.BOOLEAN.value
+    elif value == "int32":
+        return TSDataType.INT32.value
+    elif value == "int64":
+        return TSDataType.INT64.value
+    elif value == "float32":
+        return TSDataType.FLOAT.value
+    elif value == "float64":
+        return TSDataType.DOUBLE.value
+    elif value == "text":
+        return TSDataType.TEXT.value
+
+
+# Read ColumnEncodings
+
+
+def read_column_encoding(buffer, size):
+    encodings = []
+    for _ in range(size):
+        res, buffer = read_byte_from_buffer(buffer)
+        encodings.append(res)
+    return encodings, buffer
+
+
+# Read Column
+
+
+def deserialize_null_indicators(buffer, size):
+    may_have_null, buffer = read_byte_from_buffer(buffer)
+    if may_have_null != b"\x00":
+        return deserialize_from_boolean_array(buffer, size)
+    return None, buffer
+
+
+# Serialized data layout:
+#    +---------------+-----------------+-------------+
+#    | may have null | null indicators |   values    |
+#    +---------------+-----------------+-------------+
+#    | byte          | list[byte]      | list[int64] |
+#    +---------------+-----------------+-------------+
+
+
+def read_time_column(buffer, size):
+    null_indicators, buffer = deserialize_null_indicators(buffer, size)
+    if null_indicators is None:
+        values, buffer = read_from_buffer(buffer, size * 8)
+    else:
+        raise Exception("TimeColumn should not contains null value")
+    return values, buffer
+
+
+def read_int64_column(buffer, data_type, position_count):
+    null_indicators, buffer = deserialize_null_indicators(buffer, 
position_count)
+    if null_indicators is None:
+        size = position_count
+    else:
+        size = null_indicators.count(False)
+
+    if TSDataType.INT64 == data_type or TSDataType.DOUBLE == data_type:
+        values, buffer = read_from_buffer(buffer, size * 8)
+        return values, null_indicators, buffer
+    else:
+        raise Exception("Invalid data type: " + data_type)
+
+
+# Serialized data layout:
+#    +---------------+-----------------+-------------+
+#    | may have null | null indicators |   values    |
+#    +---------------+-----------------+-------------+
+#    | byte          | list[byte]      | list[int32] |
+#    +---------------+-----------------+-------------+
+
+
+def read_int32_column(buffer, data_type, position_count):
+    null_indicators, buffer = deserialize_null_indicators(buffer, 
position_count)
+    if null_indicators is None:
+        size = position_count
+    else:
+        size = null_indicators.count(False)
+
+    if TSDataType.INT32 == data_type or TSDataType.FLOAT == data_type:
+        values, buffer = read_from_buffer(buffer, size * 4)
+        return values, null_indicators, buffer
+    else:
+        raise Exception("Invalid data type: " + data_type)
+
+
+# Serialized data layout:
+#    +---------------+-----------------+-------------+
+#    | may have null | null indicators |   values    |
+#    +---------------+-----------------+-------------+
+#    | byte          | list[byte]      | list[byte] |
+#    +---------------+-----------------+-------------+
+
+
+def read_byte_column(buffer, data_type, position_count):
+    if data_type != TSDataType.BOOLEAN:
+        raise Exception("Invalid data type: " + data_type)
+    null_indicators, buffer = deserialize_null_indicators(buffer, 
position_count)
+    res, buffer = deserialize_from_boolean_array(buffer, position_count)
+    return res, null_indicators, buffer
+
+
+def deserialize_from_boolean_array(buffer, size):
+    packed_boolean_array, buffer = read_from_buffer(buffer, (size + 7) // 8)
+    current_byte = 0
+    output = [None] * size
+    position = 0
+    # read null bits 8 at a time
+    while position < (size & ~0b111):
+        value = packed_boolean_array[current_byte]
+        output[position] = (value & 0b1000_0000) != 0
+        output[position + 1] = (value & 0b0100_0000) != 0
+        output[position + 2] = (value & 0b0010_0000) != 0
+        output[position + 3] = (value & 0b0001_0000) != 0
+        output[position + 4] = (value & 0b0000_1000) != 0
+        output[position + 5] = (value & 0b0000_0100) != 0
+        output[position + 6] = (value & 0b0000_0010) != 0
+        output[position + 7] = (value & 0b0000_0001) != 0
+
+        position += 8
+        current_byte += 1
+    # read last null bits
+    if (size & 0b111) > 0:
+        value = packed_boolean_array[-1]
+        mask = 0b1000_0000
+        position = size & ~0b111
+        while position < size:
+            output[position] = (value & mask) != 0
+            mask >>= 1
+            position += 1
+    return output, buffer
+
+
+# Serialized data layout:
+#    +---------------+-----------------+-------------+
+#    | may have null | null indicators |   values    |
+#    +---------------+-----------------+-------------+
+#    | byte          | list[byte]      | list[entry] |
+#    +---------------+-----------------+-------------+
+#
+# Each entry is represented as:
+#    +---------------+-------+
+#    | value length  | value |
+#    +---------------+-------+
+#    | int32         | bytes |
+#    +---------------+-------+
+
+
+def read_binary_column(buffer, data_type, position_count):
+    if data_type != TSDataType.TEXT:
+        raise Exception("Invalid data type: " + data_type)
+    null_indicators, buffer = deserialize_null_indicators(buffer, 
position_count)
+
+    if null_indicators is None:
+        size = position_count
+    else:
+        size = null_indicators.count(False)
+    values = [None] * size
+    for i in range(size):
+        length, buffer = read_int_from_buffer(buffer)
+        res, buffer = read_from_buffer(buffer, length)
+        values[i] = res
+    return values, null_indicators, buffer
+
+
+def read_column(encoding, buffer, data_type, position_count):
+    if encoding == b"\x00":
+        return read_byte_column(buffer, data_type, position_count)
+    elif encoding == b"\x01":
+        return read_int32_column(buffer, data_type, position_count)
+    elif encoding == b"\x02":
+        return read_int64_column(buffer, data_type, position_count)
+    elif encoding == b"\x03":
+        return read_binary_column(buffer, data_type, position_count)
+    elif encoding == b"\x04":
+        return read_run_length_column(buffer, data_type, position_count)
+    else:
+        raise Exception("Unsupported encoding: " + encoding)
+
+
+# Serialized data layout:
+#    +-----------+-------------------------+
+#    | encoding  | serialized inner column |
+#    +-----------+-------------------------+
+#    | byte      | list[byte]              |
+#    +-----------+-------------------------+
+
+
+def read_run_length_column(buffer, data_type, position_count):
+    encoding, buffer = read_byte_from_buffer(buffer)
+    column, null_indicators, buffer = read_column(encoding, buffer, data_type, 
1)
+
+    return (
+        repeat(column, data_type, position_count),
+        null_indicators * position_count,
+        buffer,
+    )
+
+
+def repeat(buffer, data_type, position_count):
+    if data_type == TSDataType.BOOLEAN or data_type == TSDataType.TEXT:
+        return buffer * position_count
+    else:
+        res = bytes(0)
+        for _ in range(position_count):
+            res.join(buffer)
+        return res

Reply via email to