leaves12138 commented on code in PR #22: URL: https://github.com/apache/paimon-cpp/pull/22#discussion_r3303526623
########## src/paimon/common/data/abstract_binary_writer.cpp: ########## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/common/data/abstract_binary_writer.h" + +#include <cassert> +#include <memory> +#include <optional> + +#include "paimon/common/data/binary_array.h" +#include "paimon/common/data/binary_map.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/binary_section.h" +#include "paimon/common/data/binary_string.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/io/byte_order.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" + +namespace paimon { + +void AbstractBinaryWriter::WriteBytes(int32_t pos, const Bytes& bytes) { + int32_t len = bytes.size(); + if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) { + WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), bytes, len); + } else { + WriteBytesToVarLenPart(pos, bytes, len); + } +} + +void AbstractBinaryWriter::WriteString(int32_t pos, const BinaryString& input) { + int32_t len = input.GetSizeInBytes(); + if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) { + auto bytes = Bytes::AllocateBytes(len, pool_); + MemorySegmentUtils::CopyToBytes({input.GetSegment()}, input.GetOffset(), bytes.get(), 0, + len); + WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), *bytes, len); + } else { + WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(), len); + } +} + +void AbstractBinaryWriter::WriteBinary(int32_t pos, const Bytes& bytes) { + int32_t len = bytes.size(); + if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) { + WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), bytes, len); + } else { + WriteBytesToVarLenPart(pos, bytes, len); + } +} + +void AbstractBinaryWriter::WriteStringView(int32_t pos, const std::string_view& view) { + int32_t len = view.size(); + if (len <= BinarySection::MAX_FIX_PART_DATA_SIZE) { + WriteBytesToFixLenPart(&segment_, GetFieldOffset(pos), view, len); + } else { + WriteBytesToVarLenPart(pos, view, len); + } +} + +void AbstractBinaryWriter::WriteRow(int32_t pos, const BinaryRow& input) { + return WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(), + input.GetSizeInBytes()); +} + +void AbstractBinaryWriter::WriteArray(int32_t pos, const BinaryArray& input) { + return WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(), + input.GetSizeInBytes()); +} + +void AbstractBinaryWriter::WriteMap(int32_t pos, const BinaryMap& input) { + return WriteSegmentToVarLenPart(pos, input.GetSegment(), input.GetOffset(), + input.GetSizeInBytes()); +} +void AbstractBinaryWriter::WriteDecimal(int32_t pos, const std::optional<Decimal>& value, + int32_t precision) { + assert(value == std::nullopt || precision == value.value().Precision()); + if (Decimal::IsCompact(precision)) { + assert(value != std::nullopt); + WriteLong(pos, value.value().ToUnscaledLong()); + } else { + // grow the global buffer before writing data. + EnsureCapacity(16); + // zero-out 16 bytes + segment_.PutValue<int64_t>(cursor_, 0ll); + segment_.PutValue<int64_t>(cursor_ + 8, 0ll); + + // Make sure Decimal object has the same scale as DecimalType. + // Note that we may pass in null Decimal object to set null for it. + if (value == std::nullopt) { + SetNullBit(pos); + SetOffsetAndSize(pos, cursor_, 0l); + } else { + auto bytes = value.value().ToUnscaledBytes(); + segment_.Put(cursor_, bytes, 0, bytes.size()); + SetOffsetAndSize(pos, cursor_, bytes.size()); + } + // move the cursor forward. + cursor_ += 16; + } +} + +void AbstractBinaryWriter::WriteTimestamp(int32_t pos, const std::optional<Timestamp>& value, + int32_t precision) { + if (Timestamp::IsCompact(precision)) { + assert(value != std::nullopt); + WriteLong(pos, value.value().GetMillisecond()); + } else { + // store the nanoOfMillisecond in fixed-length part as offset and nanoOfMillisecond + EnsureCapacity(8); + + if (value == std::nullopt) { + SetNullBit(pos); + // zero-out the bytes + segment_.PutValue<int64_t>(cursor_, 0l); + SetOffsetAndSize(pos, cursor_, 0l); + } else { + segment_.PutValue<int64_t>(cursor_, value.value().GetMillisecond()); + SetOffsetAndSize(pos, cursor_, value.value().GetNanoOfMillisecond()); + } + cursor_ += 8; + } +} + +void AbstractBinaryWriter::ZeroOutPaddingBytes(int32_t num_bytes) { + if ((num_bytes & 0x07) > 0) { + segment_.PutValue<int64_t>(cursor_ + ((num_bytes >> 3) << 3), 0L); + } +} + +void AbstractBinaryWriter::EnsureCapacity(int32_t needed_size) { + const int32_t length = cursor_ + needed_size; + if (segment_.Size() < length) { + Grow(length); + } +} + +void AbstractBinaryWriter::WriteSegmentToVarLenPart(int32_t pos, const MemorySegment& segment, + int32_t offset, int32_t size) { + const int32_t rounded_size = RoundNumberOfBytesToNearestWord(size); + // grow the global buffer before writing data. + EnsureCapacity(rounded_size); + ZeroOutPaddingBytes(size); + + segment.CopyTo(offset, &segment_, cursor_, size); + SetOffsetAndSize(pos, cursor_, size); + // move the cursor forward. + cursor_ += rounded_size; +} + +template <typename T> +void AbstractBinaryWriter::WriteBytesToVarLenPart(int32_t pos, const T& bytes, int32_t len) { + const int32_t rounded_size = RoundNumberOfBytesToNearestWord(len); + // grow the global buffer before writing data. + EnsureCapacity(rounded_size); + ZeroOutPaddingBytes(len); + // Write the bytes to the variable length portion. + segment_.Put(cursor_, bytes, 0, len); + SetOffsetAndSize(pos, cursor_, len); + // move the cursor forward. + cursor_ += rounded_size; +} + +void AbstractBinaryWriter::Grow(int32_t min_capacity) { + int32_t old_capacity = segment_.Size(); + int32_t new_capacity = old_capacity + (old_capacity >> 1); + if (new_capacity - min_capacity < 0) { + new_capacity = min_capacity; + } + std::shared_ptr<Bytes> new_bytes = + Bytes::CopyOf(*(segment_.GetOrCreateHeapMemory(pool_)), new_capacity, pool_); + segment_ = MemorySegment::Wrap(new_bytes); + AfterGrow(); +} + +int32_t AbstractBinaryWriter::RoundNumberOfBytesToNearestWord(int32_t num_bytes) { + int32_t remainder = num_bytes & 0x07; + if (remainder == 0) { + return num_bytes; + } else { + return num_bytes + (8 - remainder); + } +} +template <typename T> +void AbstractBinaryWriter::WriteBytesToFixLenPart(MemorySegment* segment, int32_t field_offset, + const T& bytes, int32_t len) { + int64_t first_byte = len | 0x80; // first bit is 1, other bits is len + int64_t seven_bytes = 0L; // real data + if ((SystemByteOrder() == ByteOrder::PAIMON_LITTLE_ENDIAN)) { + for (int32_t i = 0; i < len; i++) { + seven_bytes |= ((0x00000000000000FFL & bytes[i]) << (i * 8L)); + } + } else { + for (int32_t i = 0; i < len; i++) { + seven_bytes |= ((0x00000000000000FFL & bytes[i]) << ((6 - i) * 8L)); + } + } + const int64_t offset_and_size = + (first_byte << 56) | // NOLINT(clang-analyzer-core.UndefinedBinaryOperatorResult) Review Comment: This inline-string/binary encoding still has undefined behavior. `first_byte` is an `int64_t`; for inline values `len | 0x80` sets bit 7, so shifting it left by 56 tries to produce the signed sign bit value (`0x80 << 56`), which is outside the representable range of `int64_t`. Please build this field with `uint64_t` (similar to `BinarySection::HIGHEST_FIRST_BIT`) and cast back only when storing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
