This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new c852f8d5 Fix demos & multi device & multi flush (#431)
c852f8d5 is described below

commit c852f8d51e1f1299059a7cd9d19029bf2d56715a
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Mar 7 16:20:23 2025 +0800

    Fix demos & multi device & multi flush (#431)
    
    * fix demos & fix insertion with multiple devices
    
    * reuse chunk/page writers after flush
---
 cpp/examples/c_examples/demo_read.c                | 24 ++++-----
 cpp/examples/c_examples/demo_write.c               |  1 +
 cpp/examples/cpp_examples/demo_read.cpp            |  4 +-
 cpp/src/common/schema.h                            | 13 +++++
 cpp/src/common/tsfile_common.h                     |  4 --
 cpp/src/writer/chunk_writer.cc                     | 15 +++++-
 cpp/src/writer/chunk_writer.h                      |  1 +
 cpp/src/writer/page_writer.cc                      | 12 +++--
 cpp/src/writer/time_chunk_writer.cc                | 16 +++++-
 cpp/src/writer/time_chunk_writer.h                 |  1 +
 cpp/src/writer/time_page_writer.cc                 | 12 ++++-
 cpp/src/writer/tsfile_writer.cc                    | 62 ++++++++--------------
 cpp/src/writer/tsfile_writer.h                     | 16 +++---
 cpp/src/writer/value_chunk_writer.cc               | 16 +++++-
 cpp/src/writer/value_chunk_writer.h                |  1 +
 cpp/src/writer/value_page_writer.cc                | 12 ++++-
 cpp/test/common/tsfile_common_test.cc              |  8 +--
 .../reader/table_view/tsfile_reader_table_test.cc  | 23 +++++---
 .../writer/table_view/tsfile_writer_table_test.cc  |  1 +
 cpp/test/writer/tsfile_writer_test.cc              |  1 +
 20 files changed, 155 insertions(+), 88 deletions(-)

diff --git a/cpp/examples/c_examples/demo_read.c 
b/cpp/examples/c_examples/demo_read.c
index 8e4e7acf..d3c17a12 100644
--- a/cpp/examples/c_examples/demo_read.c
+++ b/cpp/examples/c_examples/demo_read.c
@@ -43,11 +43,11 @@ ERRNO read_tsfile() {
 
     // Get query result metadata: column name and datatype
     ResultSetMetaData metadata = tsfile_result_set_get_metadata(ret);
-    int column_num = metadata.column_num;
+    int column_num = tsfile_result_set_metadata_get_column_num(metadata);
 
-    for (int i = 0; i < column_num; i++) {
-        printf("column:%s, datatype:%d\n", metadata.column_names[i],
-               metadata.data_types[i]);
+    for (int i = 1; i <= column_num; i++) {
+        printf("column:%s, datatype:%d\n", 
tsfile_result_set_metadata_get_column_name(metadata, i),
+               tsfile_result_set_metadata_get_data_type(metadata, i));
     }
 
     // Get data by column name or index.
@@ -55,37 +55,37 @@ ERRNO read_tsfile() {
         // Timestamp at column 1 and column index begin from 1.
         Timestamp timestamp =
             tsfile_result_set_get_value_by_index_int64_t(ret, 1);
-        printf("%ld ", timestamp);
+        printf("%ld\n", timestamp);
         for (int i = 1; i <= column_num; i++) {
             if (tsfile_result_set_is_null_by_index(ret, i)) {
                 printf(" null ");
             } else {
-                switch (metadata.data_types[i]) {
+                switch (tsfile_result_set_metadata_get_data_type(metadata, i)) 
{
                     case TS_DATATYPE_BOOLEAN:
-                        printf("%d", tsfile_result_set_get_value_by_index_bool(
+                        printf("%d\n", 
tsfile_result_set_get_value_by_index_bool(
                                          ret, i));
                         break;
                     case TS_DATATYPE_INT32:
-                        printf("%d",
+                        printf("%d\n",
                                
tsfile_result_set_get_value_by_index_int32_t(ret,
                                                                             
i));
                         break;
                     case TS_DATATYPE_INT64:
-                        printf("%ld",
+                        printf("%ld\n",
                                
tsfile_result_set_get_value_by_index_int64_t(ret,
                                                                             
i));
                         break;
                     case TS_DATATYPE_FLOAT:
-                        printf("%f", 
tsfile_result_set_get_value_by_index_float(
+                        printf("%f\n", 
tsfile_result_set_get_value_by_index_float(
                                          ret, i));
                         break;
                     case TS_DATATYPE_DOUBLE:
-                        printf("%lf",
+                        printf("%lf\n",
                                tsfile_result_set_get_value_by_index_double(ret,
                                                                            i));
                         break;
                     case TS_DATATYPE_STRING:
-                        printf("%s",
+                        printf("%s\n",
                                tsfile_result_set_get_value_by_index_string(ret,
                                                                            i));
                         break;
diff --git a/cpp/examples/c_examples/demo_write.c 
b/cpp/examples/c_examples/demo_write.c
index fffe3f18..ebfc99df 100644
--- a/cpp/examples/c_examples/demo_write.c
+++ b/cpp/examples/c_examples/demo_write.c
@@ -54,6 +54,7 @@ ERRNO write_tsfile() {
                      .encoding = TS_ENCODING_PLAIN,
                      .column_category = FIELD};
 
+    remove("test_c.tsfile");
     // Create a file with specify path to write tsfile.
     WriteFile file = write_file_new("test_c.tsfile", &code);
     HANDLE_ERROR(code);
diff --git a/cpp/examples/cpp_examples/demo_read.cpp 
b/cpp/examples/cpp_examples/demo_read.cpp
index 4951e8ca..667fbfa1 100644
--- a/cpp/examples/cpp_examples/demo_read.cpp
+++ b/cpp/examples/cpp_examples/demo_read.cpp
@@ -54,7 +54,7 @@ int demo_read() {
     for (int i = 1; i <= column_num; i++) {
         std::cout << "column name: " << metadata->get_column_name(i)
                   << std::endl;
-        std::cout << "column type: " << metadata->get_column_type(i)
+        std::cout << "column type: " << 
std::to_string(metadata->get_column_type(i))
                   << std::endl;
     }
 
@@ -84,7 +84,7 @@ int demo_read() {
                         std::cout << ret->get_value<double>(i) << std::endl;
                         break;
                     case common::STRING:
-                        std::cout << ret->get_value<common::String*>(i)
+                        std::cout << *(ret->get_value<common::String*>(i))
                                   << std::endl;
                         break;
                     default:;
diff --git a/cpp/src/common/schema.h b/cpp/src/common/schema.h
index 45d76de9..e0e2b3b8 100644
--- a/cpp/src/common/schema.h
+++ b/cpp/src/common/schema.h
@@ -20,6 +20,8 @@
 #ifndef COMMON_SCHEMA_H
 #define COMMON_SCHEMA_H
 
+#include <writer/chunk_writer.h>
+
 #include <algorithm>
 #include <map>  // use unordered_map instead
 #include <memory>
@@ -75,6 +77,17 @@ struct MeasurementSchema {
           chunk_writer_(nullptr),
           value_chunk_writer_(nullptr) {}
 
+    ~MeasurementSchema() {
+        if (chunk_writer_ != nullptr) {
+            delete chunk_writer_;
+            chunk_writer_ = nullptr;
+        }
+        if (value_chunk_writer_ != nullptr) {
+            delete value_chunk_writer_;
+            value_chunk_writer_ = nullptr;
+        }
+    }
+
     int serialize_to(common::ByteStream &out) {
         int ret = common::E_OK;
         if (RET_FAIL(
diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h
index df88966c..8a77f1fe 100644
--- a/cpp/src/common/tsfile_common.h
+++ b/cpp/src/common/tsfile_common.h
@@ -124,11 +124,7 @@ struct ChunkHeader {
           chunk_type_(0) {}
 
     void reset() {
-        measurement_name_.clear();
         data_size_ = 0;
-        data_type_ = common::INVALID_DATATYPE;
-        compression_type_ = common::INVALID_COMPRESSION;
-        encoding_type_ = common::INVALID_ENCODING;
         num_of_pages_ = 0;
         serialized_size_ = 0;
         chunk_type_ = 0;
diff --git a/cpp/src/writer/chunk_writer.cc b/cpp/src/writer/chunk_writer.cc
index 888692fb..73618db7 100644
--- a/cpp/src/writer/chunk_writer.cc
+++ b/cpp/src/writer/chunk_writer.cc
@@ -69,6 +69,19 @@ void ChunkWriter::destroy() {
     num_of_pages_ = 0;
 }
 
+void ChunkWriter::reset() {
+    if (chunk_statistic_ != nullptr) {
+        chunk_statistic_->reset();
+    }
+    if (first_page_statistic_ != nullptr) {
+        first_page_statistic_->reset();
+    }
+    page_writer_.reset();
+    chunk_header_.reset();
+    chunk_data_.reset();
+    num_of_pages_ = 0;
+}
+
 int ChunkWriter::seal_cur_page(bool end_chunk) {
     int ret = E_OK;
     if (RET_FAIL(chunk_statistic_->merge_with(page_writer_.get_statistic()))) {
@@ -80,7 +93,7 @@ int ChunkWriter::seal_cur_page(bool end_chunk) {
             ret = page_writer_.write_to_chunk(chunk_data_, /*header*/ true,
                                               /*stat*/ false, /*data*/ true);
             page_writer_.destroy_page_data();
-            page_writer_.destroy();
+            page_writer_.reset();
         } else {
             /*
              * if the chunk has only one page, do not writer page statistic.
diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h
index 6d80353f..7add7ebd 100644
--- a/cpp/src/writer/chunk_writer.h
+++ b/cpp/src/writer/chunk_writer.h
@@ -62,6 +62,7 @@ class ChunkWriter {
     int init(const std::string &measurement_name, common::TSDataType data_type,
              common::TSEncoding encoding,
              common::CompressionType compression_type);
+    void reset();
     void destroy();
 
     FORCE_INLINE int write(int64_t timestamp, bool value) {
diff --git a/cpp/src/writer/page_writer.cc b/cpp/src/writer/page_writer.cc
index ec8b2856..019004a0 100644
--- a/cpp/src/writer/page_writer.cc
+++ b/cpp/src/writer/page_writer.cc
@@ -115,9 +115,15 @@ int PageWriter::init(TSDataType data_type, TSEncoding 
encoding,
  * free out_stream memory, reset statistic_,
  */
 void PageWriter::reset() {
-    time_encoder_->reset();
-    value_encoder_->reset();
-    statistic_->reset();
+    if (time_encoder_ != nullptr) {
+        time_encoder_->reset();
+    }
+    if (value_encoder_ != nullptr) {
+        value_encoder_->reset();
+    }
+    if (statistic_ != nullptr) {
+        statistic_->reset();
+    }
     time_out_stream_.reset();
     value_out_stream_.reset();
 }
diff --git a/cpp/src/writer/time_chunk_writer.cc 
b/cpp/src/writer/time_chunk_writer.cc
index b65b856b..f5b7b240 100644
--- a/cpp/src/writer/time_chunk_writer.cc
+++ b/cpp/src/writer/time_chunk_writer.cc
@@ -51,6 +51,20 @@ int TimeChunkWriter::init(const std::string 
&measurement_name,
     return ret;
 }
 
+void TimeChunkWriter::reset() {
+    if (chunk_statistic_ != nullptr) {
+        chunk_statistic_->reset();
+    }
+    if (first_page_statistic_ != nullptr) {
+        first_page_statistic_->reset();
+    }
+    time_page_writer_.reset();
+    chunk_header_.reset();
+    chunk_data_.reset();
+    num_of_pages_ = 0;
+}
+
+
 void TimeChunkWriter::destroy() {
     if (num_of_pages_ == 1) {
         free_first_writer_data();
@@ -82,7 +96,7 @@ int TimeChunkWriter::seal_cur_page(bool end_chunk) {
                 time_page_writer_.write_to_chunk(chunk_data_, /*header*/ true,
                                                  /*stat*/ false, /*data*/ 
true);
             time_page_writer_.destroy_page_data();
-            time_page_writer_.destroy();
+            time_page_writer_.reset();
         } else {
             /*
              * if the chunk has only one page, do not writer page statistic.
diff --git a/cpp/src/writer/time_chunk_writer.h 
b/cpp/src/writer/time_chunk_writer.h
index d97a8aa9..8fcd9bd6 100644
--- a/cpp/src/writer/time_chunk_writer.h
+++ b/cpp/src/writer/time_chunk_writer.h
@@ -45,6 +45,7 @@ class TimeChunkWriter {
     int init(const common::ColumnSchema &col_schema);
     int init(const std::string &measurement_name, common::TSEncoding encoding,
              common::CompressionType compression_type);
+    void reset();
     void destroy();
 
     storage::ChunkHeader get_chunk_header() const { return chunk_header_; }
diff --git a/cpp/src/writer/time_page_writer.cc 
b/cpp/src/writer/time_page_writer.cc
index 49fe0fca..2ac75315 100644
--- a/cpp/src/writer/time_page_writer.cc
+++ b/cpp/src/writer/time_page_writer.cc
@@ -96,8 +96,12 @@ int TimePageWriter::init(TSEncoding encoding, 
CompressionType compression) {
 }
 
 void TimePageWriter::reset() {
-    time_encoder_->reset();
-    statistic_->reset();
+    if (time_encoder_ != nullptr) {
+        time_encoder_->reset();
+    }
+    if (statistic_ != nullptr) {
+        statistic_->reset();
+    }
     time_out_stream_.reset();
 }
 
@@ -110,6 +114,10 @@ void TimePageWriter::destroy() {
         EncoderFactory::free(time_encoder_);
         StatisticFactory::free(statistic_);
         CompressorFactory::free(compressor_);
+
+        time_encoder_ = nullptr;
+        statistic_ = nullptr;
+        compressor_ = nullptr;
     }
 }
 
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 88e2927d..f289f875 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -671,7 +671,8 @@ int TsFileWriter::write_tablet_aligned(const Tablet 
&tablet) {
         if (IS_NULL(value_chunk_writer)) {
             continue;
         }
-        value_write_column(value_chunk_writer, tablet, c);
+        value_write_column(value_chunk_writer, tablet, c, 0,
+                           tablet.get_cur_row_size());
     }
     return ret;
 }
@@ -716,7 +717,8 @@ int TsFileWriter::write_table(Tablet &tablet) {
     int start_idx = 0;
     for (auto &device_id_end_index_pair : device_id_end_index_pairs) {
         auto device_id = device_id_end_index_pair.first;
-        if (device_id_end_index_pair.second == 0) continue;
+        int end_idx = device_id_end_index_pair.second;
+        if (end_idx == 0) continue;
         if (table_aligned_) {
             SimpleVector<ValueChunkWriter *> value_chunk_writers;
             TimeChunkWriter *time_chunk_writer = nullptr;
@@ -725,7 +727,7 @@ int TsFileWriter::write_table(Tablet &tablet) {
                                                value_chunk_writers))) {
                 return ret;
             }
-            for (uint32_t i = 0; i < tablet.get_cur_row_size(); i++) {
+            for (int i = start_idx; i < end_idx; i++) {
                 time_chunk_writer->write(tablet.timestamps_[i]);
             }
             uint32_t field_col_count = 0;
@@ -737,10 +739,12 @@ int TsFileWriter::write_table(Tablet &tablet) {
                     if (IS_NULL(value_chunk_writer)) {
                         continue;
                     }
-                    value_write_column(value_chunk_writer, tablet, i);
+                    value_write_column(value_chunk_writer, tablet, i, 
start_idx,
+                                       end_idx);
                     field_col_count++;
                 }
             }
+            start_idx = end_idx;
         } else {
             MeasurementNamesFromTablet mnames_getter(tablet);
             SimpleVector<ChunkWriter *> chunk_writers;
@@ -846,28 +850,27 @@ int TsFileWriter::value_write_column(ValueChunkWriter 
*value_chunk_writer,
     int64_t *timestamps = tablet.timestamps_;
     Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx];
     BitMap &col_notnull_bitmap = tablet.bitmaps_[col_idx];
-    uint32_t row_count = tablet.max_row_num_;
 
     if (data_type == common::BOOLEAN) {
         ret = write_typed_column(value_chunk_writer, timestamps,
                                  (bool *)col_values.bool_data,
-                                 col_notnull_bitmap, row_count);
+                                 col_notnull_bitmap, start_idx, end_idx);
     } else if (data_type == common::INT32) {
         ret = write_typed_column(value_chunk_writer, timestamps,
                                  (int32_t *)col_values.int32_data,
-                                 col_notnull_bitmap, row_count);
+                                 col_notnull_bitmap, start_idx, end_idx);
     } else if (data_type == common::INT64) {
         ret = write_typed_column(value_chunk_writer, timestamps,
                                  (int64_t *)col_values.int64_data,
-                                 col_notnull_bitmap, row_count);
+                                 col_notnull_bitmap, start_idx, end_idx);
     } else if (data_type == common::FLOAT) {
         ret = write_typed_column(value_chunk_writer, timestamps,
                                  (float *)col_values.float_data,
-                                 col_notnull_bitmap, row_count);
+                                 col_notnull_bitmap, start_idx, end_idx);
     } else if (data_type == common::DOUBLE) {
         ret = write_typed_column(value_chunk_writer, timestamps,
                                  (double *)col_values.double_data,
-                                 col_notnull_bitmap, row_count);
+                                 col_notnull_bitmap, start_idx, end_idx);
     } else {
         return E_NOT_SUPPORT;
     }
@@ -888,7 +891,7 @@ int TsFileWriter::value_write_column(ValueChunkWriter 
*value_chunk_writer,
 #define DO_VALUE_WRITE_TYPED_COLUMN()                                         \
     do {                                                                      \
         int ret = E_OK;                                                       \
-        for (uint32_t r = 0; r < row_count; r++) {                            \
+        for (uint32_t r = start_idx; r < end_idx; r++) {                      \
             if (LIKELY(col_notnull_bitmap.test(r))) {                         \
                 ret = value_chunk_writer->write(timestamps[r], col_values[r], \
                                                 true);                        \
@@ -946,35 +949,35 @@ int TsFileWriter::write_typed_column(ChunkWriter 
*chunk_writer,
 int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
                                      int64_t *timestamps, bool *col_values,
                                      BitMap &col_notnull_bitmap,
-                                     uint32_t row_count) {
+                                     uint32_t start_idx, uint32_t end_idx) {
     DO_VALUE_WRITE_TYPED_COLUMN();
 }
 
 int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
                                      int64_t *timestamps, int32_t *col_values,
                                      BitMap &col_notnull_bitmap,
-                                     uint32_t row_count) {
+                                     uint32_t start_idx, uint32_t end_idx) {
     DO_VALUE_WRITE_TYPED_COLUMN();
 }
 
 int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
                                      int64_t *timestamps, int64_t *col_values,
                                      BitMap &col_notnull_bitmap,
-                                     uint32_t row_count) {
+                                     uint32_t start_idx, uint32_t end_idx) {
     DO_VALUE_WRITE_TYPED_COLUMN();
 }
 
 int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
                                      int64_t *timestamps, float *col_values,
                                      BitMap &col_notnull_bitmap,
-                                     uint32_t row_count) {
+                                     uint32_t start_idx, uint32_t end_idx) {
     DO_VALUE_WRITE_TYPED_COLUMN();
 }
 
 int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
                                      int64_t *timestamps, double *col_values,
                                      BitMap &col_notnull_bitmap,
-                                     uint32_t row_count) {
+                                     uint32_t start_idx, uint32_t end_idx) {
     DO_VALUE_WRITE_TYPED_COLUMN();
 }
 
@@ -993,25 +996,6 @@ int TsFileWriter::flush() {
     DeviceSchemasMapIter device_iter;
     for (device_iter = schemas_.begin(); device_iter != schemas_.end();
          device_iter++) {  // cppcheck-suppress postfixOperator
-        if (device_iter->second->is_aligned_) {
-            SimpleVector<ValueChunkWriter *> value_chunk_writers;
-            TimeChunkWriter *time_chunk_writer;
-            MeasurementSchemaMapNamesGetter mnames_getter(
-                device_iter->second->measurement_schema_map_);
-            if (RET_FAIL(do_check_schema_aligned(
-                    device_iter->first, mnames_getter, time_chunk_writer,
-                    value_chunk_writers))) {
-                return ret;
-            }
-        } else {
-            SimpleVector<ChunkWriter *> chunk_writers;
-            MeasurementSchemaMapNamesGetter mnames_getter(
-                device_iter->second->measurement_schema_map_);
-            if (RET_FAIL(do_check_schema(device_iter->first, mnames_getter,
-                                         chunk_writers))) {
-                return ret;
-            }
-        }
         if (check_chunk_group_empty(device_iter->second,
                                     device_iter->second->is_aligned_)) {
             continue;
@@ -1062,9 +1046,7 @@ bool 
TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
     } else if (RET_FAIL(io_writer->end_flush_chunk(                            
\
                    writer->get_chunk_statistic()))) {                          
\
     } else {                                                                   
\
-        writer->destroy();                                                     
\
-        delete writer;                                                         
\
-        writer = nullptr;                                                      
\
+        writer->reset();                                                       
\
     }
 
 int TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group,
@@ -1084,13 +1066,13 @@ int 
TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group,
     for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
          ms_iter++) {
         MeasurementSchema *m_schema = ms_iter->second;
-        if (!chunk_group->is_aligned_) {
+        if (!chunk_group->is_aligned_ && m_schema->chunk_writer_ != nullptr) {
             ChunkWriter *&chunk_writer = m_schema->chunk_writer_;
             FLUSH_CHUNK(chunk_writer, io_writer_, m_schema->measurement_name_,
                         m_schema->data_type_, m_schema->encoding_,
                         m_schema->compression_type_,
                         chunk_writer->num_of_pages())
-        } else {
+        } else if (m_schema->value_chunk_writer_ != nullptr) {
             ValueChunkWriter *&value_chunk_writer =
                 m_schema->value_chunk_writer_;
             FLUSH_CHUNK(value_chunk_writer, io_writer_,
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index bfaf70f0..1d2368fe 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -182,36 +182,36 @@ class TsFileWriter {
     int write_typed_column(ValueChunkWriter *value_chunk_writer,
                            int64_t *timestamps, bool *col_values,
                            common::BitMap &col_notnull_bitmap,
-                           uint32_t row_count);
+                           uint32_t start_idx, uint32_t end_idx);
 
     int write_typed_column(ValueChunkWriter *value_chunk_writer,
                            int64_t *timestamps, double *col_values,
                            common::BitMap &col_notnull_bitmap,
-                           uint32_t row_count);
+                           uint32_t start_idx, uint32_t end_idx);
     int write_typed_column(ValueChunkWriter *value_chunk_writer,
                            int64_t *timestamps, common::String *col_values,
                            common::BitMap &col_notnull_bitmap,
-                           int32_t row_count);
+                           uint32_t start_idx, uint32_t end_idx);
 
     int write_typed_column(ValueChunkWriter *value_chunk_writer,
                            int64_t *timestamps, float *col_values,
                            common::BitMap &col_notnull_bitmap,
-                           uint32_t row_count);
+                           uint32_t start_idx, uint32_t end_idx);
 
     int write_typed_column(ValueChunkWriter *value_chunk_writer,
                            int64_t *timestamps, int32_t *col_values,
                            common::BitMap &col_notnull_bitmap,
-                           uint32_t row_count);
+                           uint32_t start_idx, uint32_t end_idx);
 
     int write_typed_column(ValueChunkWriter *value_chunk_writer,
                            int64_t *timestamps, int64_t *col_values,
                            common::BitMap &col_notnull_bitmap,
-                           uint32_t row_count);
+                           uint32_t start_idx, uint32_t end_idx);
 
     int value_write_column(ValueChunkWriter *value_chunk_writer,
                            const Tablet &tablet, int col_idx,
-                           uint32_t start_idx = 0,
-                           uint32_t end_idx = UINT32_MAX);
+                           uint32_t start_idx,
+                           uint32_t end_idx);
 };
 
 }  // end namespace storage
diff --git a/cpp/src/writer/value_chunk_writer.cc 
b/cpp/src/writer/value_chunk_writer.cc
index 6c23cdad..e29f2565 100644
--- a/cpp/src/writer/value_chunk_writer.cc
+++ b/cpp/src/writer/value_chunk_writer.cc
@@ -52,6 +52,20 @@ int ValueChunkWriter::init(const std::string 
&measurement_name,
     return ret;
 }
 
+void ValueChunkWriter::reset() {
+    if (chunk_statistic_ != nullptr) {
+        chunk_statistic_->reset();
+    }
+    if (first_page_statistic_ != nullptr) {
+        first_page_statistic_->reset();
+    }
+    value_page_writer_.reset();
+    chunk_header_.reset();
+    chunk_data_.reset();
+    num_of_pages_ = 0;
+}
+
+
 void ValueChunkWriter::destroy() {
     if (num_of_pages_ == 1) {
         free_first_writer_data();
@@ -83,7 +97,7 @@ int ValueChunkWriter::seal_cur_page(bool end_chunk) {
                 chunk_data_, /*header*/ true,
                 /*stat*/ false, /*data*/ true);
             value_page_writer_.destroy_page_data();
-            value_page_writer_.destroy();
+            value_page_writer_.reset();
         } else {
             /*
              * if the chunk has only one page, do not writer page statistic.
diff --git a/cpp/src/writer/value_chunk_writer.h 
b/cpp/src/writer/value_chunk_writer.h
index 52581a34..a3e34239 100644
--- a/cpp/src/writer/value_chunk_writer.h
+++ b/cpp/src/writer/value_chunk_writer.h
@@ -62,6 +62,7 @@ class ValueChunkWriter {
     int init(const std::string &measurement_name, common::TSDataType data_type,
              common::TSEncoding encoding,
              common::CompressionType compression_type);
+    void reset();
     void destroy();
 
     FORCE_INLINE int write(int64_t timestamp, bool value, bool isnull) {
diff --git a/cpp/src/writer/value_page_writer.cc 
b/cpp/src/writer/value_page_writer.cc
index 76d820f0..b95307f7 100644
--- a/cpp/src/writer/value_page_writer.cc
+++ b/cpp/src/writer/value_page_writer.cc
@@ -111,8 +111,12 @@ int ValuePageWriter::init(TSDataType data_type, TSEncoding 
encoding,
 }
 
 void ValuePageWriter::reset() {
-    value_encoder_->reset();
-    statistic_->reset();
+    if (value_encoder_ != nullptr) {
+        value_encoder_->reset();
+    }
+    if (statistic_ != nullptr) {
+        statistic_->reset();
+    }
     col_notnull_bitmap_out_stream_.reset();
     value_out_stream_.reset();
 }
@@ -126,6 +130,10 @@ void ValuePageWriter::destroy() {
         EncoderFactory::free(value_encoder_);
         StatisticFactory::free(statistic_);
         CompressorFactory::free(compressor_);
+
+        value_encoder_ = nullptr;
+        statistic_ = nullptr;
+        compressor_ = nullptr;
     }
 }
 
diff --git a/cpp/test/common/tsfile_common_test.cc 
b/cpp/test/common/tsfile_common_test.cc
index be8c2ce5..e5eebd49 100644
--- a/cpp/test/common/tsfile_common_test.cc
+++ b/cpp/test/common/tsfile_common_test.cc
@@ -64,11 +64,11 @@ TEST(ChunkHeaderTest, Reset) {
   header.chunk_type_ = 1;
 
   header.reset();
-  EXPECT_EQ(header.measurement_name_, "");
+  EXPECT_EQ(header.measurement_name_, "test");
   EXPECT_EQ(header.data_size_, 0);
-  EXPECT_EQ(header.data_type_, common::INVALID_DATATYPE);
-  EXPECT_EQ(header.compression_type_, common::INVALID_COMPRESSION);
-  EXPECT_EQ(header.encoding_type_, common::INVALID_ENCODING);
+  EXPECT_EQ(header.data_type_, common::INT32);
+  EXPECT_EQ(header.compression_type_, common::SNAPPY);
+  EXPECT_EQ(header.encoding_type_, common::PLAIN);
   EXPECT_EQ(header.num_of_pages_, 0);
   EXPECT_EQ(header.serialized_size_, 0);
   EXPECT_EQ(header.chunk_type_, 0);
diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc 
b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
index b1d7c257..33e9efa8 100644
--- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
+++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
@@ -135,12 +135,12 @@ class TsFileTableReaderTest : public ::testing::Test {
         return tablet;
     }
 
-    void test_table_model_query(uint32_t points_per_device = 10) {
+    void test_table_model_query(uint32_t points_per_device = 10, uint32_t 
device_num = 1) {
         auto table_schema = gen_table_schema(0);
         auto tsfile_table_writer_ =
             std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
 
-        auto tablet = gen_tablet(table_schema, 0, 1, points_per_device);
+        auto tablet = gen_tablet(table_schema, 0, device_num, 
points_per_device);
         ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK);
         ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK);
         ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK);
@@ -157,7 +157,7 @@ class TsFileTableReaderTest : public ::testing::Test {
         std::strcpy(literal, "device_id");
         String literal_str(literal, std::strlen("device_id"));
         bool has_next = false;
-        int64_t timestamp = 0;
+        int64_t row_num = 0;
         while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
             auto column_schemas = table_schema->get_measurement_schemas();
             for (const auto& column_schema : column_schemas) {
@@ -165,7 +165,7 @@ class TsFileTableReaderTest : public ::testing::Test {
                     case TSDataType::INT64:
                         ASSERT_EQ(table_result_set->get_value<int64_t>(
                                       column_schema->measurement_name_),
-                                  0);
+                                  (row_num / points_per_device) % device_num);
                         break;
                     case TSDataType::STRING:
                         ASSERT_EQ(table_result_set
@@ -185,12 +185,12 @@ class TsFileTableReaderTest : public ::testing::Test {
                     0);
             }
             for (int i = 7; i <= 11; i++) {
-                ASSERT_EQ(table_result_set->get_value<int64_t>(i), 0);
+                ASSERT_EQ(table_result_set->get_value<int64_t>(i),  (row_num / 
points_per_device) % device_num);
             }
-            ASSERT_EQ(table_result_set->get_value<int64_t>(1), timestamp);
-            timestamp++;
+            ASSERT_EQ(table_result_set->get_value<int64_t>(1), row_num % 
points_per_device);
+            row_num++;
         }
-        ASSERT_EQ(timestamp, points_per_device);
+        ASSERT_EQ(row_num, points_per_device * device_num);
         reader.destroy_query_data_set(table_result_set);
         delete[] literal;
         ASSERT_EQ(reader.close(), common::E_OK);
@@ -221,6 +221,13 @@ TEST_F(TsFileTableReaderTest, 
TableModelQueryMultiLargePage) {
     g_config_value_.page_writer_max_point_num_ = prev_config;
 }
 
+TEST_F(TsFileTableReaderTest, TableModelQueryMultiDevices) {
+    int prev_config = g_config_value_.page_writer_max_point_num_;
+    g_config_value_.page_writer_max_point_num_ = 10000;
+    test_table_model_query(g_config_value_.page_writer_max_point_num_, 10);
+    g_config_value_.page_writer_max_point_num_ = prev_config;
+}
+
 TEST_F(TsFileTableReaderTest, TableModelResultMetadata) {
     auto table_schema = gen_table_schema(0);
     auto tsfile_table_writer_ =
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc 
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index 02b28b0f..a616b7fa 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -212,6 +212,7 @@ TEST_F(TsFileWriterTableTest, DISABLED_WriteAndReadSimple) {
     ResultSet* ret = nullptr;
     int ret_value =
         reader.query("test_table", {"device", "value"}, 10, 50, ret);
+    ASSERT_EQ(common::E_OK, ret_value);
 
     auto* table_result_set = (TableResultSet*)ret;
     bool has_next = false;
diff --git a/cpp/test/writer/tsfile_writer_test.cc 
b/cpp/test/writer/tsfile_writer_test.cc
index 3f99971f..dcd96db3 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -439,6 +439,7 @@ TEST_F(TsFileWriterTest, 
WriteMultipleTabletsAlignedMultiFlush) {
     storage::ResultSet *tmp_qds = nullptr;
 
     ret = reader.query(query_expr, tmp_qds);
+    ASSERT_EQ(ret, common::E_OK);
     auto *qds = (QDSWithoutTimeGenerator *)tmp_qds;
 
     storage::RowRecord *record;

Reply via email to