my-ship-it commented on code in PR #1337: URL: https://github.com/apache/cloudberry/pull/1337#discussion_r2309251004
########## contrib/pax_storage/src/cpp/storage/columns/pax_encoding_non_fixed_column.cc: ########## @@ -61,19 +61,40 @@ void PaxNonFixedEncodingColumn::InitEncoder() { void PaxNonFixedEncodingColumn::InitOffsetStreamCompressor() { Assert(encoder_options_.offsets_encode_type != ColumnEncoding_Kind::ColumnEncoding_Kind_DEF_ENCODED); - offsets_compressor_ = PaxCompressor::CreateBlockCompressor( - encoder_options_.offsets_encode_type); SetOffsetsEncodeType(encoder_options_.offsets_encode_type); SetOffsetsCompressLevel(encoder_options_.offsets_compress_level); + + // Prefer encoder for offsets if encoding type supports it; otherwise use + // compressor + if (encoder_options_.offsets_encode_type == + ColumnEncoding_Kind::ColumnEncoding_Kind_DIRECT_DELTA) { + PaxEncoder::EncodingOption opt = encoder_options_; + opt.column_encode_type = encoder_options_.offsets_encode_type; + // offsets are fixed-width, do not enable non_fixed streaming restriction + offsets_encoder_ = PaxEncoder::CreateStreamingEncoder(opt, false); + } else { + offsets_compressor_ = PaxCompressor::CreateBlockCompressor( Review Comment: Do we still need BlockCompressor for offsets? ########## contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc: ########## @@ -0,0 +1,517 @@ +/*------------------------------------------------------------------------- + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * pax_delta_encoding.cc + * + * IDENTIFICATION + * contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc + * + *------------------------------------------------------------------------- + */ +#include "storage/columns/pax_delta_encoding.h" + +#include <algorithm> +#include <cstring> +#include <vector> + +namespace pax { + +// delta bitpack encoder +template <typename T> +PaxDeltaEncoder<T>::PaxDeltaEncoder(const EncodingOption &encoder_options) + : PaxEncoder(encoder_options) {} + +template <typename T> +void PaxDeltaEncoder<T>::Append(char *data, size_t size) { + CBDB_CHECK(!has_append_, cbdb::CException::kExTypeAbort, + fmt("PaxDeltaEncoder::Append only support Append Once")); + has_append_ = true; + + auto T_data = reinterpret_cast<T *>(data); + auto T_data_len = size / sizeof(T); + Encode(T_data, T_data_len); +} + +inline uint8_t NumBitsAllowZero(uint32_t value) { + if (value == 0) return 0; + uint8_t bits = 0; + while (value) { + bits++; + value >>= 1; + } + return bits; +} + +// Fast bit width calculation (0 -> 0) +inline uint8_t FastNumBits(uint32_t v) { +#if defined(__GNUC__) || defined(__clang__) + return v == 0 ? 0 : static_cast<uint8_t>(32 - __builtin_clz(v)); +#else + uint8_t bits = 0; + while (v) { + ++bits; + v >>= 1; + } + return bits; +#endif +} + +// 64-bit bit writer based on raw pointer (writes to reserved DataBuffer range) +struct BitWriter64Ptr { + uint8_t *out; + size_t capacity; + size_t index; + uint64_t bit_buffer; + uint32_t bit_count; + + BitWriter64Ptr(uint8_t *p, size_t cap) + : out(p), capacity(cap), index(0), bit_buffer(0), bit_count(0) {} + + inline void Write(uint32_t value, uint8_t width) { + if (width == 0) return; + bit_buffer |= (static_cast<uint64_t>(value) << bit_count); + bit_count += width; + while (bit_count >= 8) { + out[index++] = static_cast<uint8_t>(bit_buffer & 0xFF); + bit_buffer >>= 8; + bit_count -= 8; + } + } + + inline void FlushToByte() { + if (bit_count > 0) { + out[index++] = static_cast<uint8_t>(bit_buffer & 0xFF); + bit_buffer = 0; + bit_count = 0; + } + } +}; + +// 64-bit bit reader based on raw pointer (limited to specified payload bytes) +struct BitReader64Ptr { + const uint8_t *in; + size_t size; + size_t index; + uint64_t bit_buffer; + uint32_t bit_count; + + BitReader64Ptr(const uint8_t *p, size_t len) + : in(p), size(len), index(0), bit_buffer(0), bit_count(0) {} + + inline void Ensure(uint32_t need_bits) { + while (bit_count < need_bits && index < size) { + bit_buffer |= (static_cast<uint64_t>(in[index]) << bit_count); + ++index; + bit_count += 8; + } + } + + inline uint32_t Read(uint8_t width) { + if (width == 0) return 0; + Ensure(width); + uint32_t result; + if (width == 32) + result = static_cast<uint32_t>(bit_buffer & 0xFFFFFFFFull); + else + result = static_cast<uint32_t>(bit_buffer & ((1ull << width) - 1)); + bit_buffer >>= width; + bit_count -= width; + return result; + } + + inline void AlignToByte() { + uint32_t drop = bit_count % 8; + if (drop) { + bit_buffer >>= drop; + bit_count -= drop; + } + } +}; + +/* +Overall layout: + DeltaBlockHeader (struct, fixed-size) + - uint32 value_per_block + - uint32 values_per_mini_block + - uint32 total_count + T first_value + [Repeated Block until total_count is exhausted] + - uint32 min_delta + - uint8 bit_widths[ mini_blocks_per_block ] + - uint32 payload_size + - uint8 payload[payload_size] + // bit-packed adjusted deltas, mini-block by mini-block + // within a block: bits are written LSB-first, end aligned to byte +*/ + +template <typename T> +size_t PaxDeltaEncoder<T>::GetBoundSize(size_t src_len) const { + size_t value_count = src_len / sizeof(T); + size_t block_count = (value_count + value_per_block_ - 1) / value_per_block_; + /* header + first_value + block_count * (min_delta + bit_widths + payload_size + * + payload) */ + return sizeof(DeltaBlockHeader) + sizeof(T) + + block_count * (sizeof(uint32) + mini_blocks_per_block_ + + sizeof(uint32) + sizeof(uint32)); +} + +template <typename T> +void PaxDeltaEncoder<T>::Encode(T *data, size_t count) { + // Estimate allocation: by element byte count, sufficient to accommodate + // header and bit stream + if (result_buffer_->Capacity() < + count * sizeof(T) + sizeof(DeltaBlockHeader) + sizeof(T)) { + result_buffer_->ReSize(count * sizeof(T) + sizeof(DeltaBlockHeader) + + sizeof(T)); + } + + DeltaBlockHeader header; + header.value_per_block = value_per_block_; + header.values_per_mini_block = values_per_mini_block_; + header.total_count = count; + // add delta block header + result_buffer_->Write(reinterpret_cast<char *>(&header), sizeof(header)); + result_buffer_->Brush(sizeof(header)); + // add base value + result_buffer_->Write(reinterpret_cast<char *>(&data[0]), sizeof(data[0])); + result_buffer_->Brush(sizeof(data[0])); + + size_t values_emitted = 1; + T previous_value = data[0]; + + while (values_emitted < count) { + uint32_t values_in_block = std::min( + value_per_block_, static_cast<uint32_t>(count - values_emitted)); + + if (deltas_scratch_.size() < values_in_block) { + deltas_scratch_.resize(values_in_block); + } + uint32_t *deltas = deltas_scratch_.data(); + uint32_t min_delta = UINT32_MAX; + uint32_t mini_max[mini_blocks_per_block_] = {0}; + + for (uint32_t i = 0; i < values_in_block; ++i) { + T current = data[values_emitted + i]; + uint32_t delta = static_cast<uint32_t>(current - previous_value); + deltas[i] = delta; + previous_value = current; + if (delta < min_delta) min_delta = delta; + uint32_t mini_index = i / values_per_mini_block_; + if (delta > mini_max[mini_index]) mini_max[mini_index] = delta; + } + + // write block header: min_delta later + + uint8_t bit_widths[mini_blocks_per_block_] = {0}; + uint64_t total_bits = 0; + for (uint32_t i = 0; i < mini_blocks_per_block_; ++i) { + uint32_t start = i * values_per_mini_block_; + if (start >= values_in_block) { + bit_widths[i] = 0; + continue; + } + uint32_t adjusted_max = + (mini_max[i] >= min_delta) ? (mini_max[i] - min_delta) : 0; Review Comment: Always mini_max[i] >= min_delta? ########## contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc: ########## @@ -0,0 +1,517 @@ +/*------------------------------------------------------------------------- + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * pax_delta_encoding.cc + * + * IDENTIFICATION + * contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc + * + *------------------------------------------------------------------------- + */ +#include "storage/columns/pax_delta_encoding.h" + +#include <algorithm> +#include <cstring> +#include <vector> + +namespace pax { + +// delta bitpack encoder +template <typename T> +PaxDeltaEncoder<T>::PaxDeltaEncoder(const EncodingOption &encoder_options) + : PaxEncoder(encoder_options) {} + +template <typename T> +void PaxDeltaEncoder<T>::Append(char *data, size_t size) { + CBDB_CHECK(!has_append_, cbdb::CException::kExTypeAbort, + fmt("PaxDeltaEncoder::Append only support Append Once")); + has_append_ = true; + + auto T_data = reinterpret_cast<T *>(data); + auto T_data_len = size / sizeof(T); + Encode(T_data, T_data_len); +} + +inline uint8_t NumBitsAllowZero(uint32_t value) { + if (value == 0) return 0; + uint8_t bits = 0; + while (value) { + bits++; + value >>= 1; + } + return bits; +} + +// Fast bit width calculation (0 -> 0) +inline uint8_t FastNumBits(uint32_t v) { +#if defined(__GNUC__) || defined(__clang__) + return v == 0 ? 0 : static_cast<uint8_t>(32 - __builtin_clz(v)); +#else + uint8_t bits = 0; + while (v) { + ++bits; + v >>= 1; + } + return bits; +#endif +} + +// 64-bit bit writer based on raw pointer (writes to reserved DataBuffer range) +struct BitWriter64Ptr { + uint8_t *out; + size_t capacity; Review Comment: unused variable capacity ########## contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc: ########## @@ -0,0 +1,517 @@ +/*------------------------------------------------------------------------- + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * pax_delta_encoding.cc + * + * IDENTIFICATION + * contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.cc + * + *------------------------------------------------------------------------- + */ +#include "storage/columns/pax_delta_encoding.h" + +#include <algorithm> +#include <cstring> +#include <vector> + +namespace pax { + +// delta bitpack encoder +template <typename T> +PaxDeltaEncoder<T>::PaxDeltaEncoder(const EncodingOption &encoder_options) + : PaxEncoder(encoder_options) {} + +template <typename T> +void PaxDeltaEncoder<T>::Append(char *data, size_t size) { + CBDB_CHECK(!has_append_, cbdb::CException::kExTypeAbort, + fmt("PaxDeltaEncoder::Append only support Append Once")); + has_append_ = true; + + auto T_data = reinterpret_cast<T *>(data); + auto T_data_len = size / sizeof(T); + Encode(T_data, T_data_len); +} + +inline uint8_t NumBitsAllowZero(uint32_t value) { + if (value == 0) return 0; + uint8_t bits = 0; + while (value) { + bits++; + value >>= 1; + } + return bits; +} + +// Fast bit width calculation (0 -> 0) +inline uint8_t FastNumBits(uint32_t v) { +#if defined(__GNUC__) || defined(__clang__) + return v == 0 ? 0 : static_cast<uint8_t>(32 - __builtin_clz(v)); +#else + uint8_t bits = 0; + while (v) { + ++bits; + v >>= 1; + } + return bits; +#endif +} + +// 64-bit bit writer based on raw pointer (writes to reserved DataBuffer range) +struct BitWriter64Ptr { + uint8_t *out; + size_t capacity; + size_t index; + uint64_t bit_buffer; + uint32_t bit_count; + + BitWriter64Ptr(uint8_t *p, size_t cap) + : out(p), capacity(cap), index(0), bit_buffer(0), bit_count(0) {} + + inline void Write(uint32_t value, uint8_t width) { Review Comment: Append instead of Write? ########## contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h: ########## @@ -0,0 +1,165 @@ +/*------------------------------------------------------------------------- + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * pax_delta_encoding.h + * + * IDENTIFICATION + * contrib/pax_storage/src/cpp/storage/columns/pax_delta_encoding.h + * + *------------------------------------------------------------------------- + */ +#pragma once + +#include "storage/columns/pax_encoding.h" +#include "storage/columns/pax_decoding.h" +#include <vector> + +namespace pax { + +struct BitWriter64 { Review Comment: unused BitWriter64? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org