This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch fix_data_loss in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 9f4ff4d4ffe5a1d750888b7b28799e8d3428de61 Author: ColinLee <[email protected]> AuthorDate: Thu Apr 9 17:07:50 2026 +0800 fix data loss. --- cpp/src/reader/aligned_chunk_reader.cc | 18 ++--- .../reader/table_view/tsfile_reader_table_test.cc | 81 ++++++++++++++++++++++ 2 files changed, 91 insertions(+), 8 deletions(-) diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc index 2d117b1c..955715d4 100644 --- a/cpp/src/reader/aligned_chunk_reader.cc +++ b/cpp/src/reader/aligned_chunk_reader.cc @@ -550,12 +550,13 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock( ((value_page_col_notnull_bitmap_[cur_value_index / 8] & \ 0xFF) & \ (mask >> (cur_value_index % 8))) == 0) { \ - ret = time_decoder_->read_int64(time, time_in); \ - if (ret != E_OK) { \ - break; \ - } \ if (UNLIKELY(!row_appender.add_row())) { \ ret = E_OVERFLOW; \ + cur_value_index--; \ + break; \ + } \ + ret = time_decoder_->read_int64(time, time_in); \ + if (ret != E_OK) { \ break; \ } \ row_appender.append(0, (char*)&time, sizeof(time)); \ @@ -596,12 +597,13 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK( if (value_page_col_notnull_bitmap_.empty() || ((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) & (mask >> (cur_value_index % 8))) == 0) { - ret = time_decoder_->read_int64(time, time_in); - if (ret != E_OK) { - break; - } if (UNLIKELY(!row_appender.add_row())) { ret = E_OVERFLOW; + cur_value_index--; + break; + } + ret = time_decoder_->read_int64(time, time_in); + if (ret != E_OK) { break; } row_appender.append(0, (char*)&time, sizeof(time)); diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index 4b1a8259..b29b6bd4 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -803,3 +803,84 @@ TEST_F(TsFileTableReaderTest, TestTimeColumnReader) { reader.destroy_query_data_set(table_result_set); ASSERT_EQ(reader.close(), common::E_OK); } + +// Regression test: AlignedChunkReader NULL branch overflow drops rows. +// When a TsBlock is full (block_size=1024) and the next row to decode is a +// NULL value in aligned data, the old code consumed the timestamp before +// checking add_row(), silently losing that row on E_OVERFLOW. +TEST_F(TsFileTableReaderTest, AlignedNullAtBlockBoundaryNoRowLoss) { + // block_size in RETURN_ROW mode is 1024. + const int32_t block_size = 1024; + // Write enough rows so that overflow happens multiple times, + // and place NULLs exactly at every block boundary. + const int32_t total_rows = block_size * 4; // 4096 rows + + std::string table_name = "null_boundary"; + auto* schema = new storage::TableSchema( + table_name, + { + common::ColumnSchema("tag1", common::TSDataType::STRING, + common::ColumnCategory::TAG), + // s_nullable: NULL at every block_size boundary + common::ColumnSchema("s_nullable", common::TSDataType::INT64, + common::ColumnCategory::FIELD), + // s_full: always has a value (control group) + common::ColumnSchema("s_full", common::TSDataType::INT64, + common::ColumnCategory::FIELD), + }); + + auto* writer = + new storage::TsFileTableWriter(&write_file_, schema, 128 * 1024 * 1024); + + storage::Tablet tablet( + {"tag1", "s_nullable", "s_full"}, + {common::TSDataType::STRING, common::TSDataType::INT64, + common::TSDataType::INT64}, + total_rows); + + for (int32_t i = 0; i < total_rows; i++) { + tablet.add_timestamp(i, static_cast<int64_t>(i)); + tablet.add_value(i, "tag1", "device0"); + tablet.add_value(i, "s_full", static_cast<int64_t>(i)); + // Make row at every block_size boundary NULL for s_nullable. + // These are exactly the rows that trigger E_OVERFLOW in the decoder. + if (i % block_size != 0) { + tablet.add_value(i, "s_nullable", static_cast<int64_t>(i)); + } + // else: s_nullable is NULL at i=0, 1024, 2048, 3072 + } + + ASSERT_EQ(writer->write_table(tablet), common::E_OK); + ASSERT_EQ(writer->flush(), common::E_OK); + ASSERT_EQ(writer->close(), common::E_OK); + delete writer; + delete schema; + + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), common::E_OK); + + // Helper: query a single column and count rows. + auto count_rows = [&](const std::string& col) -> int64_t { + storage::ResultSet* rs = nullptr; + int ret = reader.query(table_name, {col}, 0, INT64_MAX, rs); + EXPECT_EQ(ret, common::E_OK); + if (rs == nullptr) return -1; + auto* trs = dynamic_cast<storage::TableResultSet*>(rs); + bool hn = false; + int64_t cnt = 0; + while (trs->next(hn) == common::E_OK && hn) { cnt++; } + reader.destroy_query_data_set(rs); + return cnt; + }; + + int64_t full_rows = count_rows("s_full"); + int64_t nullable_rows = count_rows("s_nullable"); + + // Both columns must return the same number of rows. + // Before the fix, s_nullable would lose one row per overflow at a NULL + // boundary, yielding fewer rows than s_full. + ASSERT_EQ(full_rows, total_rows); + ASSERT_EQ(nullable_rows, total_rows); + + ASSERT_EQ(reader.close(), common::E_OK); +} \ No newline at end of file
