This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch fix_overflow in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 5460aa7f08c616211b626539a4be83a286786db9 Author: ColinLee <[email protected]> AuthorDate: Tue Feb 24 17:03:38 2026 +0800 Fix inability to read files exceeding int32 size limit --- cpp/examples/c_examples/demo_read.c | 5 +-- cpp/src/file/read_file.cc | 8 ++--- cpp/src/file/read_file.h | 8 ++--- cpp/src/file/tsfile_io_reader.cc | 20 ++++++----- cpp/src/file/tsfile_io_reader.h | 4 +-- cpp/src/reader/chunk_reader.cc | 2 +- cpp/src/reader/device_meta_iterator.cc | 8 ++--- cpp/src/reader/filter/tag_filter.h | 3 +- cpp/src/writer/chunk_writer.cc | 2 +- cpp/src/writer/chunk_writer.h | 5 +-- cpp/test/reader/tsfile_reader_test.cc | 65 +++++++++++++++++++++++++++++++++- 11 files changed, 97 insertions(+), 33 deletions(-) diff --git a/cpp/examples/c_examples/demo_read.c b/cpp/examples/c_examples/demo_read.c index 1d5a1199..5ac6111e 100644 --- a/cpp/examples/c_examples/demo_read.c +++ b/cpp/examples/c_examples/demo_read.c @@ -73,8 +73,9 @@ ERRNO read_tsfile() { break; case TS_DATATYPE_INT64: printf("%lld\n", - (long long)tsfile_result_set_get_value_by_index_int64_t(ret, - i)); + (long long) + tsfile_result_set_get_value_by_index_int64_t( + ret, i)); break; case TS_DATATYPE_FLOAT: printf( diff --git a/cpp/src/file/read_file.cc b/cpp/src/file/read_file.cc index 6fa9b809..1807883a 100644 --- a/cpp/src/file/read_file.cc +++ b/cpp/src/file/read_file.cc @@ -65,14 +65,14 @@ int ReadFile::open(const std::string& file_path) { return ret; } -int ReadFile::get_file_size(int32_t& file_size) { +int ReadFile::get_file_size(int64_t& file_size) { struct stat s; if (fstat(fd_, &s) < 0) { LOGE("fstat error, file_path=" << file_path_.c_str() << "fd=" << fd_ << "errno" << errno); return E_FILE_STAT_ERR; } - file_size = s.st_size; + file_size = static_cast<int64_t>(s.st_size); return E_OK; } @@ -109,13 +109,13 @@ int ReadFile::check_file_magic() { return ret; } -int ReadFile::read(int32_t offset, char* buf, int32_t buf_size, +int ReadFile::read(int64_t offset, char* buf, int32_t buf_size, int32_t& read_len) { int ret = E_OK; read_len = 0; while (read_len < buf_size) { ssize_t pread_size = ::pread(fd_, buf + read_len, buf_size - read_len, - offset + read_len); + static_cast<off_t>(offset + read_len)); if (pread_size < 0) { ret = E_FILE_READ_ERR; ////log_err("tsfile reader error, file_path=%s, errno=%d", diff --git a/cpp/src/file/read_file.h b/cpp/src/file/read_file.h index 64fe25f5..c3894009 100644 --- a/cpp/src/file/read_file.h +++ b/cpp/src/file/read_file.h @@ -37,19 +37,19 @@ class ReadFile { int open(const std::string& file_path); FORCE_INLINE bool is_opened() const { return fd_ > 0; } - FORCE_INLINE int32_t file_size() const { return file_size_; } + FORCE_INLINE int64_t file_size() const { return file_size_; } FORCE_INLINE const std::string& file_path() const { return file_path_; } /* * try to reader @buf_size bytes from @offset of this file * into @buf. @read_len return the actual len reader. */ - int read(int32_t offset, char* buf, int32_t buf_size, + int read(int64_t offset, char* buf, int32_t buf_size, int32_t& ret_read_len); void close(); private: - int get_file_size(int32_t& file_size); + int get_file_size(int64_t& file_size); int check_file_magic(); private: @@ -59,7 +59,7 @@ class ReadFile { private: std::string file_path_; int fd_; - int32_t file_size_; + int64_t file_size_; }; } // end namespace storage diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc index e16b6b4a..8bb0c0c5 100644 --- a/cpp/src/file/tsfile_io_reader.cc +++ b/cpp/src/file/tsfile_io_reader.cc @@ -135,18 +135,20 @@ int TsFileIOReader::load_tsfile_meta() { int ret = E_OK; uint32_t tsfile_meta_size = 0; - int32_t read_offset = 0; + int64_t read_offset = 0; int32_t ret_read_len = 0; // Step 1: reader the tsfile_meta_size // 1.1 prepare reader buffer - int32_t alloc_size = UTIL_MIN(TSFILE_READ_IO_SIZE, file_size()); + const int64_t fsize = file_size(); + const int32_t alloc_size = static_cast<int32_t>( + UTIL_MIN(static_cast<int64_t>(TSFILE_READ_IO_SIZE), fsize)); char* read_buf = (char*)mem_alloc(alloc_size, MOD_TSFILE_READER); if (IS_NULL(read_buf)) { return E_OOM; } // 1.2 reader data from file - read_offset = file_size() - alloc_size; + read_offset = fsize - alloc_size; ret_read_len = 0; if (RET_FAIL(read_file_->read(read_offset, read_buf, alloc_size, ret_read_len))) { @@ -177,7 +179,7 @@ int TsFileIOReader::load_tsfile_meta() { read_buf = old_read_buf; ret = E_OOM; } else if (RET_FAIL(read_file_->read( - file_size() - tsfile_meta_size - + fsize - tsfile_meta_size - TAIL_MAGIC_AND_META_SIZE_SIZE, read_buf, tsfile_meta_size, ret_read_len))) { } else if (tsfile_meta_size != (uint32_t)ret_read_len) { @@ -226,7 +228,7 @@ int TsFileIOReader::load_timeseries_index_for_ssi( } auto& pa = ssi->timeseries_index_pa_; - int start_offset = device_index_entry->get_offset(), + int64_t start_offset = device_index_entry->get_offset(), end_offset = device_ie_end_offset; ASSERT(start_offset < end_offset); const int32_t read_size = end_offset - start_offset; @@ -387,8 +389,8 @@ int TsFileIOReader::load_all_measurement_index_entry( return ret; } -int TsFileIOReader::read_device_meta_index(int32_t start_offset, - int32_t end_offset, +int TsFileIOReader::read_device_meta_index(int64_t start_offset, + int64_t end_offset, common::PageArena& pa, MetaIndexNode*& device_meta_index, bool leaf) { @@ -428,7 +430,7 @@ int TsFileIOReader::get_timeseries_indexes( return ret; } - int start_offset = device_index_entry->get_offset(), + int64_t start_offset = device_index_entry->get_offset(), end_offset = device_ie_end_offset; ASSERT(start_offset < end_offset); const int32_t read_size = end_offset - start_offset; @@ -577,7 +579,7 @@ int TsFileIOReader::get_time_column_metadata( return ret; } char* ti_buf = nullptr; - int start_idx = 0, end_idx = 0; + int64_t start_idx = 0, end_idx = 0; int ret_read_len = 0; if (measurement_node->node_type_ == LEAF_MEASUREMENT) { ByteStream buffer; diff --git a/cpp/src/file/tsfile_io_reader.h b/cpp/src/file/tsfile_io_reader.h index a62ccc14..19bcfea0 100644 --- a/cpp/src/file/tsfile_io_reader.h +++ b/cpp/src/file/tsfile_io_reader.h @@ -75,7 +75,7 @@ class TsFileIOReader { int get_chunk_metadata_list(IDeviceID device_id, std::string measurement, std::vector<ChunkMeta*>& chunk_meta_list); - int read_device_meta_index(int32_t start_offset, int32_t end_offset, + int read_device_meta_index(int64_t start_offset, int64_t end_offset, common::PageArena& pa, MetaIndexNode*& device_meta_index, bool leaf); int get_timeseries_indexes( @@ -85,7 +85,7 @@ class TsFileIOReader { common::PageArena& pa); private: - FORCE_INLINE int32_t file_size() const { return read_file_->file_size(); } + FORCE_INLINE int64_t file_size() const { return read_file_->file_size(); } int load_tsfile_meta(); diff --git a/cpp/src/reader/chunk_reader.cc b/cpp/src/reader/chunk_reader.cc index 91de1e14..1b3160b7 100644 --- a/cpp/src/reader/chunk_reader.cc +++ b/cpp/src/reader/chunk_reader.cc @@ -228,7 +228,7 @@ int ChunkReader::read_from_file_and_rewrap(int want_size) { int ret = E_OK; const int DEFAULT_READ_SIZE = 4096; // may use page_size + page_header_size char* file_data_buf = in_stream_.get_wrapped_buf(); - int offset = chunk_meta_->offset_of_chunk_header_ + chunk_visit_offset_; + int64_t offset = chunk_meta_->offset_of_chunk_header_ + chunk_visit_offset_; int read_size = (want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size); if (file_data_buf_size_ < read_size || diff --git a/cpp/src/reader/device_meta_iterator.cc b/cpp/src/reader/device_meta_iterator.cc index be740852..b1fc939f 100644 --- a/cpp/src/reader/device_meta_iterator.cc +++ b/cpp/src/reader/device_meta_iterator.cc @@ -79,8 +79,8 @@ int DeviceMetaIterator::load_leaf_device(MetaIndexNode* meta_index_node) { continue; } } - int32_t start_offset = child->get_offset(); - int32_t end_offset = i + 1 < leaf_children.size() + int64_t start_offset = child->get_offset(); + int64_t end_offset = i + 1 < leaf_children.size() ? leaf_children[i + 1]->get_offset() : meta_index_node->end_offset_; MetaIndexNode* child_node = nullptr; @@ -104,8 +104,8 @@ int DeviceMetaIterator::load_internal_node(MetaIndexNode* meta_index_node) { for (size_t i = 0; i < internal_children.size(); i++) { std::shared_ptr<IMetaIndexEntry> child = internal_children[i]; - int32_t start_offset = child->get_offset(); - int32_t end_offset = (i + 1 < internal_children.size()) + int64_t start_offset = child->get_offset(); + int64_t end_offset = (i + 1 < internal_children.size()) ? internal_children[i + 1]->get_offset() : meta_index_node->end_offset_; diff --git a/cpp/src/reader/filter/tag_filter.h b/cpp/src/reader/filter/tag_filter.h index b3b03d95..b858be8c 100644 --- a/cpp/src/reader/filter/tag_filter.h +++ b/cpp/src/reader/filter/tag_filter.h @@ -35,7 +35,8 @@ class TagFilter : public Filter { TagFilter(int col_idx, std::string tag_value); ~TagFilter() override; - bool satisfyRow(int time, std::vector<std::string*> segments) const override; + bool satisfyRow(int time, + std::vector<std::string*> segments) const override; virtual bool satisfyRow(std::vector<std::string*> segments) const; std::string value_; diff --git a/cpp/src/writer/chunk_writer.cc b/cpp/src/writer/chunk_writer.cc index d9f9603d..da181133 100644 --- a/cpp/src/writer/chunk_writer.cc +++ b/cpp/src/writer/chunk_writer.cc @@ -160,7 +160,7 @@ int ChunkWriter::end_encode_chunk() { chunk_header_.data_size_ = chunk_data_.total_size(); chunk_header_.num_of_pages_ = num_of_pages_; } - } else if (first_page_statistic_ != nullptr) { + } else if (first_page_statistic_->count_ != 0) { ret = write_first_page_data(chunk_data_, false); if (E_OK == ret) { free_first_writer_data(); diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h index fc1a7113..3032ff9a 100644 --- a/cpp/src/writer/chunk_writer.h +++ b/cpp/src/writer/chunk_writer.h @@ -139,10 +139,7 @@ class ChunkWriter { FORCE_INLINE void free_first_writer_data() { // free memory first_page_data_.destroy(); - if (first_page_statistic_ != nullptr) { - StatisticFactory::free(first_page_statistic_); - first_page_statistic_ = nullptr; - } + first_page_statistic_->reset(); } int seal_cur_page(bool end_chunk); void save_first_page_data(PageWriter& first_page_writer); diff --git a/cpp/test/reader/tsfile_reader_test.cc b/cpp/test/reader/tsfile_reader_test.cc index 6d4edd5a..b426d7ec 100644 --- a/cpp/test/reader/tsfile_reader_test.cc +++ b/cpp/test/reader/tsfile_reader_test.cc @@ -19,6 +19,7 @@ #include "reader/tsfile_reader.h" #include <gtest/gtest.h> +#include <sys/stat.h> #include <random> #include <vector> @@ -52,7 +53,7 @@ class TsFileReaderTest : public ::testing::Test { void TearDown() override { delete tsfile_writer_; - remove(file_name_.c_str()); + // remove(file_name_.c_str()); libtsfile_destroy(); } @@ -237,3 +238,65 @@ TEST_F(TsFileReaderTest, GetTimeseriesSchema) { ASSERT_EQ(measurement_schemas[1].data_type_, TSDataType::INT32); reader.close(); } + +static const int64_t kLargeFileNumRecords = 300000000; +static const int64_t kLargeFileFlushBatch = 100000; + +TEST_F(TsFileReaderTest, + DISABLED_LargeFileNoEncodingNoCompression_WriteAndRead) { + std::string device_path = "device1"; + std::string measurement_name = "temperature"; + common::TSDataType data_type = common::TSDataType::INT64; + common::TSEncoding encoding = common::TSEncoding::PLAIN; + common::CompressionType compression_type = + common::CompressionType::UNCOMPRESSED; + + tsfile_writer_->register_timeseries( + device_path, storage::MeasurementSchema(measurement_name, data_type, + encoding, compression_type)); + + const int64_t start_time = 1622505600000LL; + for (int64_t i = 0; i < kLargeFileNumRecords; ++i) { + TsRecord record(start_time + i * 1000, device_path); + record.add_point(measurement_name, static_cast<int64_t>(i)); + ASSERT_EQ(tsfile_writer_->write_record(record), E_OK); + if ((i + 1) % kLargeFileFlushBatch == 0) { + ASSERT_EQ(tsfile_writer_->flush(), E_OK); + } + } + ASSERT_EQ(tsfile_writer_->flush(), E_OK); + ASSERT_EQ(tsfile_writer_->close(), E_OK); + + std::vector<std::string> select_list = {"device1.temperature"}; + const int64_t end_time = start_time + (kLargeFileNumRecords - 1) * 1000 + 1; + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + storage::ResultSet* tmp_qds = nullptr; + ret = reader.query(select_list, start_time, end_time, tmp_qds); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_qds, nullptr); + + auto* qds = static_cast<QDSWithoutTimeGenerator*>(tmp_qds); + std::shared_ptr<ResultSetMetadata> meta = qds->get_metadata(); + ASSERT_NE(meta, nullptr); + ASSERT_EQ(meta->get_column_type(1), INT64); + ASSERT_EQ(meta->get_column_type(2), INT64); + + int64_t row_count = 0; + bool has_next = false; + + while (true) { + ret = qds->next(has_next); + ASSERT_EQ(ret, common::E_OK); + if (!has_next) break; + row_count++; + } + + ASSERT_EQ(row_count, kLargeFileNumRecords); + + reader.destroy_query_data_set(qds); + reader.close(); +}
