This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch fix/tree-query-and-rle-decoder in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 6e2c1fbe084da24c945f573389acebf3474bfde3 Author: ColinLee <[email protected]> AuthorDate: Wed Apr 1 18:11:42 2026 +0800 tmp code. --- cpp/src/encoding/int32_rle_decoder.h | 52 ++++++++- cpp/src/encoding/int64_rle_decoder.h | 34 +++++- cpp/test/encoding/int32_rle_codec_test.cc | 127 +++++++++++++++++++++ .../reader/tree_view/tsfile_reader_tree_test.cc | 97 ++++++++++++++++ 4 files changed, 303 insertions(+), 7 deletions(-) diff --git a/cpp/src/encoding/int32_rle_decoder.h b/cpp/src/encoding/int32_rle_decoder.h index 402efa87..4c310510 100644 --- a/cpp/src/encoding/int32_rle_decoder.h +++ b/cpp/src/encoding/int32_rle_decoder.h @@ -109,13 +109,20 @@ class Int32RleDecoder : public Decoder { read_length_and_bitwidth(buffer); } if (current_count_ == 0) { - uint8_t header; + // The header is encoded as an unsigned varint where: + // low bit = 0 => RLE run: header_value >> 1 is the run count + // low bit = 1 => bit-packing: header_value >> 1 is the group count + uint32_t header_value = 0; int ret = common::E_OK; - if (RET_FAIL( - common::SerializationUtil::read_ui8(header, byte_cache_))) { + if (RET_FAIL(common::SerializationUtil::read_var_uint( + header_value, byte_cache_))) { return ret; } - call_read_bit_packing_buffer(header); + if (header_value & 1) { + call_read_bit_packing_buffer(header_value); + } else { + call_read_rle_run(header_value); + } } --current_count_; int32_t result = current_buffer_[bitpacking_num_ - current_count_ - 1]; @@ -125,8 +132,41 @@ class Int32RleDecoder : public Decoder { return result; } - int call_read_bit_packing_buffer(uint8_t header) { - int bit_packed_group_count = (int)(header >> 1); + int call_read_rle_run(uint32_t header_value) { + int ret = common::E_OK; + int run_length = (int)(header_value >> 1); + if (run_length <= 0) { + return common::E_DECODE_ERR; + } + int byte_width = (bit_width_ + 7) / 8; + // Read the repeated value (stored as byte_width bytes, little-endian) + int32_t value = 0; + for (int i = 0; i < byte_width; i++) { + uint8_t b; + if (RET_FAIL(common::SerializationUtil::read_ui8(b, byte_cache_))) { + return ret; + } + value |= ((int32_t)b) << (i * 8); + } + if (current_buffer_ != nullptr) { + common::mem_free(current_buffer_); + } + current_buffer_ = static_cast<int32_t*>( + common::mem_alloc(sizeof(int32_t) * run_length, + common::MOD_DECODER_OBJ)); + if (IS_NULL(current_buffer_)) { + return common::E_OOM; + } + for (int i = 0; i < run_length; i++) { + current_buffer_[i] = value; + } + current_count_ = run_length; + bitpacking_num_ = run_length; + return ret; + } + + int call_read_bit_packing_buffer(uint32_t header_value) { + int bit_packed_group_count = (int)(header_value >> 1); // in last bit-packing group, there may be some useless value, // lastBitPackedNum indicates how many values is useful uint8_t last_bit_packed_num; diff --git a/cpp/src/encoding/int64_rle_decoder.h b/cpp/src/encoding/int64_rle_decoder.h index 8010fe0f..4a33e051 100644 --- a/cpp/src/encoding/int64_rle_decoder.h +++ b/cpp/src/encoding/int64_rle_decoder.h @@ -112,7 +112,11 @@ class Int64RleDecoder : public Decoder { common::SerializationUtil::read_ui8(header, byte_cache_))) { return ret; } - call_read_bit_packing_buffer(header); + if (header & 1) { + call_read_bit_packing_buffer(header); + } else { + call_read_rle_run(header); + } } --current_count_; int64_t result = current_buffer_[bitpacking_num_ - current_count_ - 1]; @@ -122,6 +126,34 @@ class Int64RleDecoder : public Decoder { return result; } + int call_read_rle_run(uint8_t header) { + int ret = common::E_OK; + int run_length = (int)(header >> 1); + if (run_length <= 0) { + return common::E_DECODE_ERR; + } + int byte_width = (bit_width_ + 7) / 8; + // Read the repeated value (stored as byte_width bytes, little-endian) + int64_t value = 0; + for (int i = 0; i < byte_width; i++) { + uint8_t b; + if (RET_FAIL(common::SerializationUtil::read_ui8(b, byte_cache_))) { + return ret; + } + value |= ((int64_t)b) << (i * 8); + } + if (current_buffer_ != nullptr) { + delete[] current_buffer_; + } + current_buffer_ = new int64_t[run_length]; + for (int i = 0; i < run_length; i++) { + current_buffer_[i] = value; + } + current_count_ = run_length; + bitpacking_num_ = run_length; + return ret; + } + int call_read_bit_packing_buffer(uint8_t header) { int bit_packed_group_count = (int)(header >> 1); // in last bit-packing group, there may be some useless value, diff --git a/cpp/test/encoding/int32_rle_codec_test.cc b/cpp/test/encoding/int32_rle_codec_test.cc index c580a0eb..0d27d021 100644 --- a/cpp/test/encoding/int32_rle_codec_test.cc +++ b/cpp/test/encoding/int32_rle_codec_test.cc @@ -164,4 +164,131 @@ TEST_F(Int32RleEncoderTest, EncodeFlushWithoutData) { EXPECT_EQ(stream.total_size(), 0u); } +// Helper: write a manually crafted RLE segment (Java/Parquet hybrid RLE format): +// [length_varint] [bit_width] [group_header_varint] [value_bytes...] +// run_count must be the actual count (written as (run_count<<1)|0 varint). +static void write_rle_segment(common::ByteStream& stream, uint8_t bit_width, + uint32_t run_count, int32_t value) { + common::ByteStream content(32, common::MOD_ENCODER_OBJ); + common::SerializationUtil::write_ui8(bit_width, content); + // Group header: (run_count << 1) | 0 = even varint + common::SerializationUtil::write_var_uint(run_count << 1, content); + // Value: ceil(bit_width / 8) bytes, little-endian + int byte_width = (bit_width + 7) / 8; + for (int i = 0; i < byte_width; i++) { + common::SerializationUtil::write_ui8((value >> (i * 8)) & 0xFF, + content); + } + uint32_t length = content.total_size(); + common::SerializationUtil::write_var_uint(length, stream); + // Append content bytes to stream + uint8_t buf[64]; + uint32_t read_len = 0; + content.read_buf(buf, length, read_len); + stream.write_buf(buf, read_len); +} + +// Regression test: run_count=64 requires a 2-byte LEB128 varint header +// ((64<<1)|0 = 128 = [0x80, 0x01]). Before the fix, only 1 byte was read, +// causing byte misalignment and incorrect decoding. +TEST_F(Int32RleEncoderTest, DecodeRleRunCountExactly64) { + common::ByteStream stream(32, common::MOD_ENCODER_OBJ); + write_rle_segment(stream, /*bit_width=*/7, /*run_count=*/64, + /*value=*/42); + + Int32RleDecoder decoder; + std::vector<int32_t> decoded; + while (decoder.has_next(stream)) { + int32_t v; + decoder.read_int32(v, stream); + decoded.push_back(v); + } + + ASSERT_EQ(decoded.size(), 64u); + for (int32_t v : decoded) { + EXPECT_EQ(v, 42); + } +} + +// Run counts of 128 and 256 each need a 2-byte varint header. +TEST_F(Int32RleEncoderTest, DecodeRleRunCountLarge) { + for (uint32_t count : {128u, 256u, 500u}) { + common::ByteStream stream(64, common::MOD_ENCODER_OBJ); + write_rle_segment(stream, /*bit_width=*/8, /*run_count=*/count, + /*value=*/100); + + Int32RleDecoder decoder; + std::vector<int32_t> decoded; + while (decoder.has_next(stream)) { + int32_t v; + decoder.read_int32(v, stream); + decoded.push_back(v); + } + + ASSERT_EQ(decoded.size(), (size_t)count) + << "Failed for run_count=" << count; + for (int32_t v : decoded) { + EXPECT_EQ(v, 100); + } + } +} + +// Multiple consecutive RLE runs including large ones (simulates real sensor +// data with repeated values and occasional changes). +TEST_F(Int32RleEncoderTest, DecodeMultipleRleRunsWithLargeCount) { + common::ByteStream stream(128, common::MOD_ENCODER_OBJ); + write_rle_segment(stream, /*bit_width=*/8, /*run_count=*/64, + /*value=*/25); + write_rle_segment(stream, /*bit_width=*/8, /*run_count=*/8, + /*value=*/26); + write_rle_segment(stream, /*bit_width=*/8, /*run_count=*/100, + /*value=*/25); + + Int32RleDecoder decoder; + std::vector<int32_t> decoded; + while (decoder.has_next(stream)) { + int32_t v; + decoder.read_int32(v, stream); + decoded.push_back(v); + } + + ASSERT_EQ(decoded.size(), 172u); // 64 + 8 + 100 + for (size_t i = 0; i < 64; i++) EXPECT_EQ(decoded[i], 25); + for (size_t i = 64; i < 72; i++) EXPECT_EQ(decoded[i], 26); + for (size_t i = 72; i < 172; i++) EXPECT_EQ(decoded[i], 25); +} + +// Regression test: Int32RleDecoder::reset() previously called delete[] on +// current_buffer_ which was allocated with mem_alloc (malloc). This is +// undefined behaviour and typically causes a crash. The fix uses mem_free. +TEST_F(Int32RleEncoderTest, ResetAfterDecodeNoCrash) { + common::ByteStream stream(1024, common::MOD_ENCODER_OBJ); + Int32RleEncoder encoder; + for (int i = 0; i < 16; i++) encoder.encode(i, stream); + encoder.flush(stream); + + Int32RleDecoder decoder; + // Decode at least one value to populate current_buffer_ via mem_alloc. + int32_t v; + ASSERT_TRUE(decoder.has_next(stream)); + decoder.read_int32(v, stream); + + // reset() must use mem_free, not delete[]. Before the fix this would crash. + decoder.reset(); + + // Verify the decoder is functional after reset. + common::ByteStream stream2(1024, common::MOD_ENCODER_OBJ); + Int32RleEncoder encoder2; + std::vector<int32_t> input = {7, 7, 7, 7, 7, 7, 7, 7}; + for (int32_t x : input) encoder2.encode(x, stream2); + encoder2.flush(stream2); + + std::vector<int32_t> decoded; + while (decoder.has_next(stream2)) { + decoder.read_int32(v, stream2); + decoded.push_back(v); + } + ASSERT_EQ(decoded, input); +} + } // namespace storage diff --git a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc index aa4ff254..258db4d0 100644 --- a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc +++ b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc @@ -20,6 +20,7 @@ #include <random> +#include "reader/result_set.h" #include "common/record.h" #include "common/schema.h" #include "common/tablet.h" @@ -425,3 +426,99 @@ TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) { delete measurement; } } + +// Regression test: query_table_on_tree on a device path with three or more +// dot-segments (e.g. "root.sensors.TH") previously SEGVed because: +// 1. StringArrayDeviceID split "root.sensors.TH" into ["root","sensors","TH"] +// instead of the correct ["root.sensors","TH"], so get_table_name() returned +// "root" instead of "root.sensors". +// 2. load_device_index_entry used operator[] on the table map which inserted a +// null entry, then asserted on it. +TEST_F(TsFileTreeReaderTest, QueryTableOnTreeDeepDevicePath) { + TsFileTreeWriter writer(&write_file_); + // Device paths with 3 dot-segments: table_name="root.sensors", device="TH" + std::string device_id = "root.sensors.TH"; + std::string m_temp = "temperature"; + std::string m_humi = "humidity"; + auto* ms_temp = new MeasurementSchema(m_temp, INT32); + auto* ms_humi = new MeasurementSchema(m_humi, INT32); + ASSERT_EQ(E_OK, writer.register_timeseries(device_id, ms_temp)); + ASSERT_EQ(E_OK, writer.register_timeseries(device_id, ms_humi)); + delete ms_temp; + delete ms_humi; + + for (int ts = 0; ts < 5; ts++) { + TsRecord rec(device_id, ts); + rec.add_point(m_temp, static_cast<int32_t>(20 + ts)); + rec.add_point(m_humi, static_cast<int32_t>(50 + ts)); + ASSERT_EQ(E_OK, writer.write(rec)); + } + writer.flush(); + writer.close(); + + TsFileReader reader; + ASSERT_EQ(E_OK, reader.open(file_name_)); + ResultSet* result; + // query_table_on_tree used to SEGV here due to wrong table-name lookup + ASSERT_EQ(E_OK, + reader.query_table_on_tree({m_temp, m_humi}, INT64_MIN, + INT64_MAX, result)); + + auto* trs = static_cast<storage::TableResultSet*>(result); + bool has_next = false; + int row_cnt = 0; + while (IS_SUCC(trs->next(has_next)) && has_next) { + row_cnt++; + } + EXPECT_EQ(row_cnt, 5); + reader.destroy_query_data_set(result); + reader.close(); +} + +// Regression test: load_device_index_entry previously used operator[] to look +// up the table node, which silently inserted a null entry and then asserted. +// After the fix it uses find() and returns E_DEVICE_NOT_EXIST gracefully. +// This is triggered when querying a measurement that no device in the file has. +TEST_F(TsFileTreeReaderTest, QueryTableOnTreeMissingMeasurement) { + // Use the same multi-device setup as ReadTreeByTable to ensure a valid file. + TsFileTreeWriter writer(&write_file_); + std::vector<std::string> device_ids = {"root.db1.t1", "root.db2.t1"}; + std::string m_temp = "temperature"; + for (auto dev : device_ids) { + auto* ms = new MeasurementSchema(m_temp, INT32); + ASSERT_EQ(E_OK, writer.register_timeseries(dev, ms)); + delete ms; + TsRecord rec(dev, 0); + rec.add_point(m_temp, static_cast<int32_t>(25)); + ASSERT_EQ(E_OK, writer.write(rec)); + } + writer.flush(); + writer.close(); + + TsFileReader reader; + ASSERT_EQ(E_OK, reader.open(file_name_)); + ResultSet* result = nullptr; + // "nonexistent" is not present in any device. Before the fix, + // load_device_index_entry used operator[] which inserted null and crashed. + // After the fix it returns E_DEVICE_NOT_EXIST or E_COLUMN_NOT_EXIST. + int ret = reader.query_table_on_tree({"nonexistent"}, INT64_MIN, + INT64_MAX, result); + EXPECT_NE(ret, E_OK); // Must not succeed (measurement not found) + if (result != nullptr) { + reader.destroy_query_data_set(result); + } + reader.close(); +} + +TEST_F(TsFileTreeReaderTest, simpletest) { + TsFileReader reader; + reader.open("/Users/colin/Library/Containers/com.tencent.xinWeChat/Data/Documents/xwechat_files/wxid_197w1jpv66ag22_cc63/msg/file/2026-03/1761643915818-1-0-0.tsfile"); + ResultSet* result; + int ret = reader.query_table_on_tree({"t", "h"}, INT64_MIN, + INT64_MAX, result); + ASSERT_EQ(ret, E_OK); + + auto* table_result_set = (storage::TableResultSet*)result; + bool has_next = false; + print_table_result_set((table_result_set)); +}
