This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 2c2e9117 Fix large page read and bitmap index (#427)
2c2e9117 is described below
commit 2c2e91176f316c19cee962a50926a6e95ae97dce
Author: Jiang Tian <[email protected]>
AuthorDate: Tue Mar 4 14:18:41 2025 +0800
Fix large page read and bitmap index (#427)
* Fix large page read and bitmap index
* revert
index
* fix ts_2diff decode
* revert cur_value_index when fail to add row
* revert test
---
cpp/src/cwrapper/tsfile_cwrapper.cc | 2 +-
cpp/src/encoding/ts2diff_decoder.h | 8 ++--
cpp/src/encoding/ts2diff_encoder.h | 8 ++--
cpp/src/reader/aligned_chunk_reader.cc | 24 +++++++-----
cpp/src/reader/aligned_chunk_reader.h | 2 +-
.../reader/block/single_device_tsblock_reader.cc | 1 -
cpp/src/reader/qds_without_timegenerator.cc | 3 +-
cpp/src/reader/table_query_executor.h | 2 +-
cpp/src/writer/tsfile_writer.cc | 2 +-
cpp/test/encoding/ts2diff_codec_test.cc | 44 +++++++++++++++++++++-
.../reader/table_view/tsfile_reader_table_test.cc | 15 +++++++-
11 files changed, 85 insertions(+), 26 deletions(-)
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc
b/cpp/src/cwrapper/tsfile_cwrapper.cc
index 7e43ac80..e18c843a 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.cc
+++ b/cpp/src/cwrapper/tsfile_cwrapper.cc
@@ -37,7 +37,7 @@ static bool is_init = false;
void init_tsfile_config() {
if (!is_init) {
- common::init_config_value();
+ common::init_common();
is_init = true;
}
}
diff --git a/cpp/src/encoding/ts2diff_decoder.h
b/cpp/src/encoding/ts2diff_decoder.h
index 76d77dc0..aa740fa7 100644
--- a/cpp/src/encoding/ts2diff_decoder.h
+++ b/cpp/src/encoding/ts2diff_decoder.h
@@ -74,6 +74,7 @@ class TS2DIFFDecoder : public Decoder {
int64_t read_long(int bits, common::ByteStream &in) {
int64_t value = 0;
while (bits > 0) {
+ read_byte_if_empty(in);
if (bits > bits_left_ || bits == 8) {
// Take only the bits_left_ "least significant" bits.
uint8_t d = (uint8_t)(buffer_ & ((1 << bits_left_) - 1));
@@ -93,7 +94,6 @@ class TS2DIFFDecoder : public Decoder {
if (bits <= 0 && current_index_ == 0) {
break;
}
- read_byte_if_empty(in);
}
return value;
}
@@ -129,14 +129,15 @@ inline int32_t
TS2DIFFDecoder<int32_t>::decode(common::ByteStream &in) {
ret_value = first_value_;
bits_left_ = 0;
buffer_ = 0;
- read_byte_if_empty(in);
current_index_ = 1;
return ret_value;
}
if (current_index_++ >= write_index_) {
current_index_ = 0;
}
- stored_value_ = (int32_t)read_long(bit_width_, in);
+ // although it seems we are reading an int64, bit_width_ guarantees
+ // that it does not overflow int32
+ stored_value_ = read_long(bit_width_, in);
ret_value = stored_value_ + first_value_ + delta_min_;
first_value_ = ret_value;
@@ -151,7 +152,6 @@ inline int64_t
TS2DIFFDecoder<int64_t>::decode(common::ByteStream &in) {
common::SerializationUtil::read_i64(delta_min_, in);
common::SerializationUtil::read_i64(first_value_, in);
ret_value = first_value_;
- read_byte_if_empty(in);
current_index_ = 1;
return ret_value;
}
diff --git a/cpp/src/encoding/ts2diff_encoder.h
b/cpp/src/encoding/ts2diff_encoder.h
index 5e456bc7..a1c5a534 100644
--- a/cpp/src/encoding/ts2diff_encoder.h
+++ b/cpp/src/encoding/ts2diff_encoder.h
@@ -200,10 +200,10 @@ inline int
TS2DIFFEncoder<int64_t>::flush(common::ByteStream &out_stream) {
// Calculate the bit length of each value to writer
int bit_width = cal_bit_width(delta_arr_max_ - delta_arr_min_);
// writer header
- common::SerializationUtil::write_ui32(write_index_, out_stream);
- common::SerializationUtil::write_ui32(bit_width, out_stream);
- common::SerializationUtil::write_ui64(delta_arr_min_, out_stream);
- common::SerializationUtil::write_ui64(first_value_, out_stream);
+ common::SerializationUtil::write_i32(write_index_, out_stream);
+ common::SerializationUtil::write_i32(bit_width, out_stream);
+ common::SerializationUtil::write_i64(delta_arr_min_, out_stream);
+ common::SerializationUtil::write_i64(first_value_, out_stream);
// writer data
for (int i = 0; i < write_index_; i++) {
write_bits(delta_arr_[i], bit_width, out_stream);
diff --git a/cpp/src/reader/aligned_chunk_reader.cc
b/cpp/src/reader/aligned_chunk_reader.cc
index a6ce3e7b..6b89e801 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -289,7 +289,7 @@ int AlignedChunkReader::get_cur_page_header(ChunkMeta
*&chunk_meta,
// @in_stream_
int AlignedChunkReader::read_from_file_and_rewrap(
common::ByteStream &in_stream_, ChunkMeta *&chunk_meta,
- uint32_t &chunk_visit_offset, int32_t file_data_buf_size, int want_size) {
+ uint32_t &chunk_visit_offset, int32_t &file_data_buf_size, int want_size) {
int ret = E_OK;
const int DEFAULT_READ_SIZE = 4096; // may use page_size +
page_header_size
char *file_data_buf = in_stream_.get_wrapped_buf();
@@ -350,8 +350,8 @@ int AlignedChunkReader::decode_cur_time_page_data() {
// << cur_page_header_.compressed_size_ << std::endl;
if (RET_FAIL(read_from_file_and_rewrap(
time_in_stream_, time_chunk_meta_, time_chunk_visit_offset_,
- cur_time_page_header_.compressed_size_,
- file_data_time_buf_size_))) {
+ file_data_time_buf_size_,
+ cur_value_page_header_.compressed_size_))) {
}
}
@@ -429,8 +429,8 @@ int AlignedChunkReader::decode_cur_value_page_data() {
// << cur_page_header_.compressed_size_ << std::endl;
if (RET_FAIL(read_from_file_and_rewrap(
value_in_stream_, value_chunk_meta_, value_chunk_visit_offset_,
- cur_value_page_header_.compressed_size_,
- file_data_value_buf_size_))) {
+ file_data_value_buf_size_,
+ cur_value_page_header_.compressed_size_))) {
}
}
@@ -529,19 +529,26 @@ int
AlignedChunkReader::decode_time_value_buf_into_tsblock(
int64_t time = 0;
\
CppType value;
\
while ((time_decoder_->has_remaining() || time_in.has_remaining())
\
- && (value_decoder_->has_remaining() || \
- value_in.has_remaining())){
\
+ && (value_decoder_->has_remaining() ||
\
+ value_in.has_remaining())){
\
cur_value_index++;
\
if (((value_page_col_notnull_bitmap_[cur_value_index / 8] &
\
0xFF) &
\
(mask >> (cur_value_index % 8))) == 0) {
\
- RET_FAIL(time_decoder_->read_int64(time, time_in));
\
+ ret = time_decoder_->read_int64(time, time_in);
\
if (ret != E_OK) {
\
break;
\
}
\
+ ret = value_decoder_->read_##ReadType(value,
\
+ value_in);
\
+ if (ret != E_OK) {
\
+ break;
\
+ }
\
+ continue;
\
}
\
if (UNLIKELY(!row_appender.add_row())) {
\
ret = E_OVERFLOW;
\
+ cur_value_index--; \
break;
\
} else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) {
\
} else if (RET_FAIL(value_decoder_->read_##ReadType(value,
\
@@ -569,7 +576,6 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
while ((time_decoder_->has_remaining() &&
value_decoder_->has_remaining()) ||
(time_in.has_remaining() && value_in.has_remaining())) {
- cur_value_index++;
if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) &
(mask >> (cur_value_index % 8))) == 0) {
RET_FAIL(time_decoder_->read_int64(time, time_in));
diff --git a/cpp/src/reader/aligned_chunk_reader.h
b/cpp/src/reader/aligned_chunk_reader.h
index 365e9efe..becca806 100644
--- a/cpp/src/reader/aligned_chunk_reader.h
+++ b/cpp/src/reader/aligned_chunk_reader.h
@@ -94,7 +94,7 @@ class AlignedChunkReader : public IChunkReader {
int read_from_file_and_rewrap(common::ByteStream &in_stream_,
ChunkMeta *&chunk_meta,
uint32_t &chunk_visit_offset,
- int32_t file_data_buf_size,
+ int32_t &file_data_buf_size,
int want_size = 0);
bool cur_page_statisify_filter(Filter *filter);
int skip_cur_page();
diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc
b/cpp/src/reader/block/single_device_tsblock_reader.cc
index e8ee6693..2c62d87f 100644
--- a/cpp/src/reader/block/single_device_tsblock_reader.cc
+++ b/cpp/src/reader/block/single_device_tsblock_reader.cc
@@ -39,7 +39,6 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask*
device_query_task,
int ret = common::E_OK;
pa_.init(512, common::AllocModID::MOD_TSFILE_READER);
tuple_desc_.reset();
- common::init_common();
auto table_schema = device_query_task->get_table_schema();
tuple_desc_.push_back(common::g_time_column_schema);
for (const auto& column_name : device_query_task_->get_column_names()) {
diff --git a/cpp/src/reader/qds_without_timegenerator.cc
b/cpp/src/reader/qds_without_timegenerator.cc
index 0a4a6f38..805c9c4d 100644
--- a/cpp/src/reader/qds_without_timegenerator.cc
+++ b/cpp/src/reader/qds_without_timegenerator.cc
@@ -68,7 +68,8 @@ int QDSWithoutTimeGenerator::init(TsFileIOReader *io_reader,
for (size_t i = 0; i < path_count; i++) {
get_next_tsblock(i, true);
- data_types.push_back(value_iters_[i]->get_data_type());
+ data_types.push_back(value_iters_[i] != nullptr ?
+ value_iters_[i]->get_data_type() : TSDataType::NULL_TYPE);
}
result_set_metadata_ =
std::make_shared<ResultSetMetadata>(column_names, data_types);
diff --git a/cpp/src/reader/table_query_executor.h
b/cpp/src/reader/table_query_executor.h
index 32d522c0..e9a3c513 100644
--- a/cpp/src/reader/table_query_executor.h
+++ b/cpp/src/reader/table_query_executor.h
@@ -50,7 +50,7 @@ class TableQueryExecutor {
tsfile_io_reader_->init(read_file);
meta_data_querier_ = new MetadataQuerier(tsfile_io_reader_);
table_query_ordering_ = TableQueryOrdering::DEVICE;
- block_size_ = 1024;
+ block_size_ = 10240;
}
~TableQueryExecutor() {
if (meta_data_querier_ != nullptr) {
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 8f88ec03..88e2927d 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -38,7 +38,7 @@ int libtsfile_init() {
}
ModStat::get_instance().init();
- init_config_value();
+ init_common();
g_s_is_inited = true;
return E_OK;
diff --git a/cpp/test/encoding/ts2diff_codec_test.cc
b/cpp/test/encoding/ts2diff_codec_test.cc
index cd6197cf..15836308 100644
--- a/cpp/test/encoding/ts2diff_codec_test.cc
+++ b/cpp/test/encoding/ts2diff_codec_test.cc
@@ -58,7 +58,7 @@ class TS2DIFFCodecTest : public ::testing::Test {
LongTS2DIFFDecoder* decoder_long_;
};
-TEST_F(TS2DIFFCodecTest, TestIntEncoding) {
+TEST_F(TS2DIFFCodecTest, TestIntEncoding1) {
common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
const int row_num = 10000;
int32_t data[row_num];
@@ -79,7 +79,49 @@ TEST_F(TS2DIFFCodecTest, TestIntEncoding) {
}
}
+TEST_F(TS2DIFFCodecTest, TestIntEncoding2) {
+ common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
+ const int row_num = 10000;
+ int32_t data[row_num];
+ memset(data, 0, sizeof(int32_t) * row_num);
+ for (int i = 0; i < row_num; i++) {
+ data[i] = i;
+ }
+
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(encoder_int_->encode(data[i], out_stream), common::E_OK);
+ }
+ EXPECT_EQ(encoder_int_->flush(out_stream), common::E_OK);
+
+ int32_t x;
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(decoder_int_->read_int32(x, out_stream), common::E_OK);
+ EXPECT_EQ(x, data[i]);
+ }
+}
+
TEST_F(TS2DIFFCodecTest, TestLongEncoding) {
+ common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
+ const int row_num = 10000;
+ int64_t data[row_num];
+ memset(data, 0, sizeof(int64_t) * row_num);
+ for (int i = 0; i < row_num; i++) {
+ data[i] = i;
+ }
+
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(encoder_long_->encode(data[i], out_stream), common::E_OK);
+ }
+ EXPECT_EQ(encoder_long_->flush(out_stream), common::E_OK);
+
+ int64_t x;
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(decoder_long_->read_int64(x, out_stream), common::E_OK);
+ EXPECT_EQ(x, data[i]);
+ }
+}
+
+TEST_F(TS2DIFFCodecTest, TestLongEncoding2) {
common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
const int row_num = 10000;
int64_t data[row_num];
diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
index 1fb787e8..0d8f6832 100644
--- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
+++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
@@ -102,7 +102,8 @@ class TsFileTableReaderTest : public ::testing::Test {
storage::Tablet tablet(table_schema->get_table_name(),
table_schema->get_measurement_names(),
table_schema->get_data_types(),
- table_schema->get_column_categories());
+ table_schema->get_column_categories(),
+ device_num * num_timestamp_per_device);
char* literal = new char[std::strlen("device_id") + 1];
std::strcpy(literal, "device_id");
@@ -199,8 +200,18 @@ class TsFileTableReaderTest : public ::testing::Test {
TEST_F(TsFileTableReaderTest, TableModelQuery) { test_table_model_query(); }
-TEST_F(TsFileTableReaderTest, TableModelQueryOnePage) {
+TEST_F(TsFileTableReaderTest, TableModelQueryOneSmallPage) {
+ int prev_config = g_config_value_.page_writer_max_point_num_;
+ g_config_value_.page_writer_max_point_num_ = 5;
test_table_model_query(g_config_value_.page_writer_max_point_num_);
+ g_config_value_.page_writer_max_point_num_ = prev_config;
+}
+
+TEST_F(TsFileTableReaderTest, TableModelQueryOneLargePage) {
+ int prev_config = g_config_value_.page_writer_max_point_num_;
+ g_config_value_.page_writer_max_point_num_ = 10000;
+ test_table_model_query(g_config_value_.page_writer_max_point_num_);
+ g_config_value_.page_writer_max_point_num_ = prev_config;
}
TEST_F(TsFileTableReaderTest, TableModelResultMetadata) {