This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new e18bd44d fix read multi-large pages (#429)
e18bd44d is described below
commit e18bd44d9702c0b727e38d2ca81c77a04cefe7de
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Mar 7 09:39:58 2025 +0800
fix read multi-large pages (#429)
* fix read multi-large pages
* Do not reallocate buffer when reading page header
---
cpp/src/reader/aligned_chunk_reader.cc | 11 +++++++----
cpp/src/reader/aligned_chunk_reader.h | 3 ++-
cpp/src/reader/block/single_device_tsblock_reader.cc | 15 ++++++++++-----
cpp/src/reader/block/single_device_tsblock_reader.h | 2 +-
cpp/test/reader/table_view/tsfile_reader_table_test.cc | 7 +++++++
5 files changed, 27 insertions(+), 11 deletions(-)
diff --git a/cpp/src/reader/aligned_chunk_reader.cc
b/cpp/src/reader/aligned_chunk_reader.cc
index 6b89e801..9884b459 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -258,13 +258,15 @@ int AlignedChunkReader::get_cur_page_header(ChunkMeta
*&chunk_meta,
if (deserialize_buf_not_enough(ret) && retry) {
retry = false;
retry_read_want_size += 1024;
- int32_t file_data_buf_size =
+ int32_t &file_data_buf_size =
chunk_header.data_type_ == common::VECTOR
? file_data_time_buf_size_
: file_data_value_buf_size_;
+ // do not shrink buffer for page header, otherwise, the buffer is
+ // most likely to grow back when reading page data
if (E_OK == read_from_file_and_rewrap(
in_stream, chunk_meta, chunk_visit_offset,
- file_data_buf_size, retry_read_want_size)) {
+ file_data_buf_size, retry_read_want_size, false)) {
continue;
}
}
@@ -289,14 +291,15 @@ int AlignedChunkReader::get_cur_page_header(ChunkMeta
*&chunk_meta,
// @in_stream_
int AlignedChunkReader::read_from_file_and_rewrap(
common::ByteStream &in_stream_, ChunkMeta *&chunk_meta,
- uint32_t &chunk_visit_offset, int32_t &file_data_buf_size, int want_size) {
+ uint32_t &chunk_visit_offset, int32_t &file_data_buf_size, int want_size,
+ bool may_shrink) {
int ret = E_OK;
const int DEFAULT_READ_SIZE = 4096; // may use page_size +
page_header_size
char *file_data_buf = in_stream_.get_wrapped_buf();
int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
int read_size =
(want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size);
- if (file_data_buf_size < read_size || read_size < file_data_buf_size / 10)
{
+ if (file_data_buf_size < read_size || (may_shrink && read_size <
file_data_buf_size / 10)) {
file_data_buf = (char *)mem_realloc(file_data_buf, read_size);
if (IS_NULL(file_data_buf)) {
return E_OOM;
diff --git a/cpp/src/reader/aligned_chunk_reader.h
b/cpp/src/reader/aligned_chunk_reader.h
index becca806..0f10184a 100644
--- a/cpp/src/reader/aligned_chunk_reader.h
+++ b/cpp/src/reader/aligned_chunk_reader.h
@@ -95,7 +95,8 @@ class AlignedChunkReader : public IChunkReader {
ChunkMeta *&chunk_meta,
uint32_t &chunk_visit_offset,
int32_t &file_data_buf_size,
- int want_size = 0);
+ int want_size = 0,
+ bool may_shrink = true);
bool cur_page_statisify_filter(Filter *filter);
int skip_cur_page();
int decode_cur_time_page_data();
diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc
b/cpp/src/reader/block/single_device_tsblock_reader.cc
index 2c62d87f..99c8e3ed 100644
--- a/cpp/src/reader/block/single_device_tsblock_reader.cc
+++ b/cpp/src/reader/block/single_device_tsblock_reader.cc
@@ -158,19 +158,24 @@ int SingleDeviceTsBlockReader::fill_measurements(
}
col_appenders_[time_column_index_]->append((const char*)&next_time_,
sizeof(next_time_));
- for (auto& column_contest : column_contexts) {
- column_contest->fill_into(col_appenders_);
- advance_column(column_contest);
+ for (auto& column_context : column_contexts) {
+ column_context->fill_into(col_appenders_);
+ if (RET_FAIL(advance_column(column_context))) {
+ break;
+ }
}
}
return ret;
}
-void SingleDeviceTsBlockReader::advance_column(
+int SingleDeviceTsBlockReader::advance_column(
MeasurementColumnContext* column_context) {
- if (column_context->move_iter() == common::E_NO_MORE_DATA) {
+ int ret = column_context->move_iter();
+ if (ret == common::E_NO_MORE_DATA) {
column_context->remove_from(field_column_contexts_);
+ ret = common::E_OK;
}
+ return ret;
}
void SingleMeasurementColumnContext::remove_from(
diff --git a/cpp/src/reader/block/single_device_tsblock_reader.h
b/cpp/src/reader/block/single_device_tsblock_reader.h
index d9a99b05..d2949cd4 100644
--- a/cpp/src/reader/block/single_device_tsblock_reader.h
+++ b/cpp/src/reader/block/single_device_tsblock_reader.h
@@ -52,7 +52,7 @@ class SingleDeviceTsBlockReader : public TsBlockReader {
int fill_measurements(
std::vector<MeasurementColumnContext*>& column_contexts);
int fill_ids();
- void advance_column(MeasurementColumnContext* column_context);
+ int advance_column(MeasurementColumnContext* column_context);
DeviceQueryTask* device_query_task_;
Filter* field_filter_;
diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
index 0d8f6832..b1d7c257 100644
--- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
+++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
@@ -214,6 +214,13 @@ TEST_F(TsFileTableReaderTest, TableModelQueryOneLargePage)
{
g_config_value_.page_writer_max_point_num_ = prev_config;
}
+TEST_F(TsFileTableReaderTest, TableModelQueryMultiLargePage) {
+ int prev_config = g_config_value_.page_writer_max_point_num_;
+ g_config_value_.page_writer_max_point_num_ = 10000;
+ test_table_model_query(1000000);
+ g_config_value_.page_writer_max_point_num_ = prev_config;
+}
+
TEST_F(TsFileTableReaderTest, TableModelResultMetadata) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ =