This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch iotdb in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 425fb467091bca150d3b36c585a15069bc67aa08 Author: RkGrit <[email protected]> AuthorDate: Tue Jul 2 17:01:44 2024 +0800 feature: finish multiple flush for c++_tsfile (#110) * feature: finish multiple flush for c++_tsfile * rewrite some code and add some comments * add multi-flush for write_tablet() --- cpp/src/common/config/config.h | 2 + cpp/src/common/global.cc | 2 + cpp/src/common/tsfile_common.h | 9 +++ cpp/src/encoding/bitpack_encoder.h | 43 ++++++++------ cpp/src/encoding/dictionary_encoder.h | 6 ++ cpp/src/encoding/encoder.h | 8 +++ cpp/src/encoding/gorilla_encoder.h | 6 +- cpp/src/encoding/plain_encoder.h | 8 +-- cpp/src/encoding/ts2diff_encoder.h | 5 ++ cpp/src/file/tsfile_io_writer.cc | 2 - cpp/src/file/tsfile_io_writer.h | 4 +- cpp/src/writer/chunk_writer.cc | 9 +++ cpp/src/writer/chunk_writer.h | 2 + cpp/src/writer/page_writer.h | 12 ++++ cpp/src/writer/tsfile_writer.cc | 104 ++++++++++++++++++++++++++++------ cpp/src/writer/tsfile_writer.h | 8 ++- 16 files changed, 183 insertions(+), 47 deletions(-) diff --git a/cpp/src/common/config/config.h b/cpp/src/common/config/config.h index 679a69b0..d2589229 100644 --- a/cpp/src/common/config/config.h +++ b/cpp/src/common/config/config.h @@ -49,6 +49,8 @@ typedef struct ConfigValue { const char *tsfile_prefix_path_; TSEncoding time_encoding_type_; uint32_t memtable_flusher_poll_interval_seconds_; + int32_t chunk_group_size_threshold_; + int32_t record_count_for_next_mem_check_; } ConfigValue; extern void init_config_value(); diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc index b54d4499..dfbbfc81 100644 --- a/cpp/src/common/global.cc +++ b/cpp/src/common/global.cc @@ -52,6 +52,8 @@ void init_config_value() { g_config_value_.page_writer_max_memory_bytes_ = 128 * 1024; // 128 k g_config_value_.max_degree_of_index_node_ = 256; g_config_value_.tsfile_index_bloom_filter_error_percent_ = 0.05; + g_config_value_.record_count_for_next_mem_check_ = 100; + g_config_value_.chunk_group_size_threshold_ = 128 * 1024 * 1024; // g_config_value_.tsfile_prefix_path_ = "./data"; g_config_value_.tsfile_prefix_path_ = ""; // g_config_value_.time_encoding_type_ = TS_2DIFF; diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h index a1920e25..601d5886 100644 --- a/cpp/src/common/tsfile_common.h +++ b/cpp/src/common/tsfile_common.h @@ -86,6 +86,15 @@ struct PageHeader { } return ret; } + + /** max page header size without statistics. */ + static int estimat_max_page_header_size_without_statistics() { + // uncompressedSize, compressedSize + // because we use unsigned varInt to encode these two integer, each + // unsigned varInt will cost at most 5 bytes + return 2 * (4 + 1); + } + #ifndef NDEBUG friend std::ostream &operator<<(std::ostream &os, const PageHeader &h) { os << "{uncompressed_size_=" << h.uncompressed_size_ diff --git a/cpp/src/encoding/bitpack_encoder.h b/cpp/src/encoding/bitpack_encoder.h index 85a385bf..708e00a7 100644 --- a/cpp/src/encoding/bitpack_encoder.h +++ b/cpp/src/encoding/bitpack_encoder.h @@ -58,13 +58,12 @@ class BitPackEncoder { packer_ = nullptr; } - void destroy() { /* do nothing for BitPackEncoder */ - delete (packer_); - } + void destroy() { delete (packer_); } void reset() { num_buffered_values_ = 0; bitpacked_group_count_ = 0; + bit_width_ = 0; bytes_buffer_.clear(); byte_cache_.reset(); values_.clear(); @@ -74,11 +73,13 @@ class BitPackEncoder { FORCE_INLINE void encode(int64_t value, common::ByteStream &out) { values_.push_back(value); + int current_bit_width = 32 - number_of_leading_zeros(value); + if (current_bit_width > bit_width_) { + bit_width_ = current_bit_width; + } } void encode_flush(common::ByteStream &out) { - // we get bit width after receiving all data - bit_width_ = get_int_max_bit_width(values_); ASSERT(packer_ == nullptr); packer_ = new IntPacker(bit_width_); common::SerializationUtil::write_i8(bit_width_, byte_cache_); @@ -121,19 +122,6 @@ class BitPackEncoder { common::mem_free(bytes); } - int get_int_max_bit_width(std::vector<int64_t> values) { - // TODO: Optimization - find the maximum value first, and then calcuate - // the bit width - int max = 1; - for (size_t i = 0; i < values.size(); i++) { - int bitWidth = 64 - number_of_leading_zeros(values[i]); - if (bitWidth > max) { - max = bitWidth; - } - } - return max; - } - void flush(common::ByteStream &out) { int last_bitpacked_num = num_buffered_values_; if (num_buffered_values_ > 0) { @@ -167,6 +155,25 @@ class BitPackEncoder { bytes_buffer_.clear(); bitpacked_group_count_ = 0; } + + int get_max_byte_size() { + if (values_.empty()) { + return 0; + } + int totalValues = values_.size(); + int fullGroups = totalValues / 8; + int remainingValues = totalValues % 8; + int bytesPerGroup = (bit_width_ * 8 + 7) / 8; + int maxSize = 0; + maxSize += fullGroups * bytesPerGroup; + if (remainingValues > 0) { + maxSize += bytesPerGroup; + } + + // Add additional bytes, because each bitpack group has a header of 1 byte and a tail of 1 byte. + maxSize += fullGroups * (1 + 1) + (remainingValues > 0 ? (1 + 1) : 0); + return maxSize; + } }; } // end namespace storage diff --git a/cpp/src/encoding/dictionary_encoder.h b/cpp/src/encoding/dictionary_encoder.h index 73c6a6f5..9ea3f6f0 100644 --- a/cpp/src/encoding/dictionary_encoder.h +++ b/cpp/src/encoding/dictionary_encoder.h @@ -95,6 +95,12 @@ class DictionaryEncoder { void write_encoded_data(common::ByteStream &out) { values_encoder_.encode_flush(out); } + + int get_max_byte_size() + { + // 4 bytes for storing dictionary size + return 4 + map_size_ + values_encoder_.get_max_byte_size(); + } }; } // end namespace storage diff --git a/cpp/src/encoding/encoder.h b/cpp/src/encoding/encoder.h index 99e5146a..9b52cfd4 100644 --- a/cpp/src/encoding/encoder.h +++ b/cpp/src/encoding/encoder.h @@ -38,6 +38,14 @@ class Encoder { virtual int encode(float value, common::ByteStream &out_stream) = 0; virtual int encode(double value, common::ByteStream &out_stream) = 0; virtual int flush(common::ByteStream &out_stream) = 0; + + /** + * The maximal possible memory size occupied by current Encoder. This + * statistic value doesn't involve OutputStream. + * + * @return the maximal size of possible memory occupied by current encoder + */ + virtual int get_max_byte_size() = 0; }; } // end namespace storage diff --git a/cpp/src/encoding/gorilla_encoder.h b/cpp/src/encoding/gorilla_encoder.h index 2ae580cd..47cedf18 100644 --- a/cpp/src/encoding/gorilla_encoder.h +++ b/cpp/src/encoding/gorilla_encoder.h @@ -119,7 +119,7 @@ class GorillaEncoder : public Encoder { } } - int get_one_item_max_size(); + int get_max_byte_size(); void write_first(T value, common::ByteStream &out); void write_existing_leading(T xor_value, common::ByteStream &out); void write_new_leading(T xor_value, int leading_zeros, int trailing_zeros, @@ -145,11 +145,11 @@ class GorillaEncoder : public Encoder { }; template <> -FORCE_INLINE int GorillaEncoder<int32_t>::get_one_item_max_size() { +FORCE_INLINE int GorillaEncoder<int32_t>::get_max_byte_size() { return INT32_ONE_ITEM_MAX_SIZE; } template <> -FORCE_INLINE int GorillaEncoder<int64_t>::get_one_item_max_size() { +FORCE_INLINE int GorillaEncoder<int64_t>::get_max_byte_size() { return INT64_ONE_ITEM_MAX_SIZE; } diff --git a/cpp/src/encoding/plain_encoder.h b/cpp/src/encoding/plain_encoder.h index 556aea97..ab1e06f6 100644 --- a/cpp/src/encoding/plain_encoder.h +++ b/cpp/src/encoding/plain_encoder.h @@ -28,10 +28,8 @@ class PlainEncoder : public Encoder { public: PlainEncoder() {} ~PlainEncoder() { destroy(); } - void destroy() { /* do nothing for PlainEncoder */ - } - void reset() { /* do thing for PlainEncoder */ - } + void destroy() { /* do nothing for PlainEncoder */ } + void reset() { /* do thing for PlainEncoder */ } FORCE_INLINE int encode(bool value, common::ByteStream &out_stream) { return common::SerializationUtil::write_i8(value ? 1 : 0, out_stream); @@ -57,6 +55,8 @@ class PlainEncoder : public Encoder { // do nothing for PlainEncoder return common::E_OK; } + + int get_max_byte_size() { return 0; } }; } // end namespace storage diff --git a/cpp/src/encoding/ts2diff_encoder.h b/cpp/src/encoding/ts2diff_encoder.h index 081c7fa7..67d2c83d 100644 --- a/cpp/src/encoding/ts2diff_encoder.h +++ b/cpp/src/encoding/ts2diff_encoder.h @@ -112,6 +112,11 @@ class TS2DIFFEncoder : public Encoder { int flush(common::ByteStream &out_stream); + int get_max_byte_size() { + // The meaning of 24 is: index(4)+width(4)+minDeltaBase(8)+firstValue(8) + return 24 + write_index_ * 8; + } + public: int block_size_; T *delta_arr_; diff --git a/cpp/src/file/tsfile_io_writer.cc b/cpp/src/file/tsfile_io_writer.cc index ac1e1aa1..3880116e 100644 --- a/cpp/src/file/tsfile_io_writer.cc +++ b/cpp/src/file/tsfile_io_writer.cc @@ -168,8 +168,6 @@ int TsFileIOWriter::flush_chunk(ByteStream &chunk_data) { } else if (RET_FAIL(flush_stream_to_file())) { // log_err("flush stream error, ret=%d", ret); } - - chunk_data.destroy(); return ret; } diff --git a/cpp/src/file/tsfile_io_writer.h b/cpp/src/file/tsfile_io_writer.h index db594a12..092c543f 100644 --- a/cpp/src/file/tsfile_io_writer.h +++ b/cpp/src/file/tsfile_io_writer.h @@ -97,8 +97,8 @@ class TsFileIOWriter { int end_flush_chunk_group(); int end_file(); - FORCE_INLINE std::vector<TimeseriesTimeIndexEntry> - &get_ts_time_index_vector() { + FORCE_INLINE std::vector<TimeseriesTimeIndexEntry> & + get_ts_time_index_vector() { return ts_time_index_vector_; } FORCE_INLINE std::string get_file_path() { return file_->get_file_path(); } diff --git a/cpp/src/writer/chunk_writer.cc b/cpp/src/writer/chunk_writer.cc index 51c78af5..a283fb96 100644 --- a/cpp/src/writer/chunk_writer.cc +++ b/cpp/src/writer/chunk_writer.cc @@ -63,6 +63,7 @@ void ChunkWriter::destroy() { } chunk_data_.destroy(); chunk_header_.reset(); + num_of_pages_ = 0; } int ChunkWriter::seal_cur_page(bool end_chunk) { @@ -151,4 +152,12 @@ int ChunkWriter::end_encode_chunk() { return ret; } + +int64_t ChunkWriter::estimate_max_series_mem_size(){ + return chunk_data_.total_size() + + page_writer_.estimate_max_mem_size() + + PageHeader::estimat_max_page_header_size_without_statistics() + + get_typed_statistic_sizeof(page_writer_.get_statistic()->get_type()); +} + } // end namespace storage diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h index c181ce98..54d51f54 100644 --- a/cpp/src/writer/chunk_writer.h +++ b/cpp/src/writer/chunk_writer.h @@ -93,6 +93,8 @@ class ChunkWriter { return false; } + int64_t estimate_max_series_mem_size(); + private: FORCE_INLINE bool is_cur_page_full() const { // FIXME diff --git a/cpp/src/writer/page_writer.h b/cpp/src/writer/page_writer.h index 98b25179..6800c7ea 100644 --- a/cpp/src/writer/page_writer.h +++ b/cpp/src/writer/page_writer.h @@ -134,6 +134,18 @@ class PageWriter { FORCE_INLINE uint32_t get_page_memory_size() const { return time_out_stream_.total_size() + value_out_stream_.total_size(); } + /** + * calculate max possible memory size it occupies, including time + * outputStream and value outputStream, because size outputStream is never + * used until flushing. + * + * @return allocated size in time, value and outputStream + */ + FORCE_INLINE uint32_t estimate_max_mem_size() const { + return time_out_stream_.total_size() + value_out_stream_.total_size() + + time_encoder_->get_max_byte_size() + + value_encoder_->get_max_byte_size(); + } int write_to_chunk(common::ByteStream &pages_data, bool write_header, bool write_statistic, bool write_data_to_chunk_data); FORCE_INLINE common::ByteStream &get_time_data() { diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 37fe6098..7d5db9f0 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -60,10 +60,10 @@ TsFileWriter::TsFileWriter() : write_file_(nullptr), io_writer_(nullptr), schemas_(), - start_file_done_(false), - write_file_created_(false) { - // do nothing. -} + record_count_since_last_flush_(0), + record_count_for_next_mem_check_( + g_config_value_.record_count_for_next_mem_check_), + write_file_created_(false) {} TsFileWriter::~TsFileWriter() { destroy(); } @@ -94,6 +94,7 @@ void TsFileWriter::destroy() { delete dev_iter->second; } schemas_.clear(); + record_count_since_last_flush_ = 0; } int TsFileWriter::init(WriteFile *write_file) { @@ -124,6 +125,12 @@ int TsFileWriter::open(const std::string &file_path, int flags, mode_t mode) { if (RET_FAIL(write_file_->create(file_path, flags, mode))) { } else { io_writer_->init(write_file_); + if (RET_FAIL(io_writer_->start_file())) { + return ret; + } +#if DEBUG_SE + std::cout << "finish writing magic code" << std::endl; +#endif } return ret; } @@ -202,9 +209,7 @@ struct MeasurementNamesFromTablet { template <typename MeasurementNamesGetter> int TsFileWriter::do_check_schema(const std::string &device_name, MeasurementNamesGetter &measurement_names, - SimpleVector<ChunkWriter *> &chunk_writers) -// std::vector<ChunkWriter*> &chunk_writers) -{ + SimpleVector<ChunkWriter *> &chunk_writers) { int ret = E_OK; DeviceSchemaIter dev_it = schemas_.find(device_name); MeasurementSchemaGroup *device_schema = NULL; @@ -220,8 +225,8 @@ int TsFileWriter::do_check_schema(const std::string &device_name, if (UNLIKELY(ms_iter == msm.end())) { chunk_writers.push_back(NULL); } else { - // Here we may check data_type against ms_iter. But in Java - // libtsfile, no check here. + // In Java we will check data_type. But in C++, no check here. + // Because checks are performed at the chunk layer and page layer MeasurementSchema *ms = ms_iter->second; if (IS_NULL(ms->chunk_writer_)) { ms->chunk_writer_ = new ChunkWriter; @@ -242,6 +247,43 @@ int TsFileWriter::do_check_schema(const std::string &device_name, return ret; } +int64_t TsFileWriter::calculate_mem_size_for_all_group() { + int64_t mem_total_size = 0; + DeviceSchemaIter device_iter; + for (device_iter = schemas_.begin(); device_iter != schemas_.end(); + device_iter++) { + MeasurementSchemaGroup *chunk_group = device_iter->second; + MeasurementSchemaMap &map = chunk_group->measurement_schema_map_; + for (MeasurementSchemaMapIter ms_iter = map.begin(); + ms_iter != map.end(); ms_iter++) { + MeasurementSchema *m_schema = ms_iter->second; + ChunkWriter *&chunk_writer = m_schema->chunk_writer_; + if (chunk_writer != NULL) { + mem_total_size += chunk_writer->estimate_max_series_mem_size(); + } + } + } + return mem_total_size; +} + +/** + * check occupied memory size, if it exceeds the chunkGroupSize threshold, flush + * them to given OutputStream. + */ +int TsFileWriter::check_memory_size_and_may_flush_chunks() { + int ret = E_OK; + if (record_count_since_last_flush_ >= record_count_for_next_mem_check_) { + int64_t mem_size = calculate_mem_size_for_all_group(); + record_count_for_next_mem_check_ = + record_count_since_last_flush_ * + common::g_config_value_.chunk_group_size_threshold_ / mem_size; + if (mem_size > common::g_config_value_.chunk_group_size_threshold_) { + ret = flush(); + } + } + return ret; +} + int TsFileWriter::write_record(const TsRecord &record) { int ret = E_OK; // std::vector<ChunkWriter*> chunk_writers; @@ -261,6 +303,9 @@ int TsFileWriter::write_record(const TsRecord &record) { // ignore point writer failure write_point(chunk_writer, record.timestamp_, record.points_[c]); } + + record_count_since_last_flush_++; + ret = check_memory_size_and_may_flush_chunks(); return ret; } @@ -303,6 +348,9 @@ int TsFileWriter::write_tablet(const Tablet &tablet) { // ignore writer failure write_column(chunk_writer, tablet, c); } + + record_count_since_last_flush_ += tablet.max_rows_; + ret = check_memory_size_and_may_flush_chunks(); return ret; } @@ -381,27 +429,44 @@ int TsFileWriter::write_typed_column(ChunkWriter *chunk_writer, // TODO make sure ret is meaningful to SDK user int TsFileWriter::flush() { int ret = E_OK; - if (!start_file_done_) { - if (RET_FAIL(io_writer_->start_file())) { - return ret; - } - start_file_done_ = true; - } - std::cout << "finish writing magic code" << std::endl; /* since @schemas_ used std::map which is rbtree underlying, so map itself is ordered by device name. */ std::map<std::string, MeasurementSchemaGroup *>::iterator device_iter; for (device_iter = schemas_.begin(); device_iter != schemas_.end(); - device_iter++) { // cppcheck-suppress postfixOperator + device_iter++) { + if (check_chunk_group_empty(device_iter->second)) { + continue; + } + if (RET_FAIL(io_writer_->start_flush_chunk_group(device_iter->first))) { + return ret; } else if (RET_FAIL(flush_chunk_group(device_iter->second))) { + return ret; } else if (RET_FAIL(io_writer_->end_flush_chunk_group())) { + return ret; } } + record_count_since_last_flush_ = 0; return ret; } +bool TsFileWriter::check_chunk_group_empty( + MeasurementSchemaGroup *chunk_group) { + MeasurementSchemaMap &map = chunk_group->measurement_schema_map_; + for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end(); + ms_iter++) { + MeasurementSchema *m_schema = ms_iter->second; + if (m_schema->chunk_writer_ != NULL && + m_schema->chunk_writer_->num_of_pages() > 0) { + // first condition is to avoid first flush empty chunk group + // second condition is to avoid repeated flush + return false; + } + } + return true; +} + int TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group) { int ret = E_OK; MeasurementSchemaMap &map = chunk_group->measurement_schema_map_; @@ -415,10 +480,15 @@ int TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group) { m_schema->measurement_name_, m_schema->data_type_, m_schema->encoding_, m_schema->compression_type_, chunk_writer->num_of_pages()))) { + return ret; } else if (RET_FAIL(io_writer_->flush_chunk( chunk_writer->get_chunk_data()))) { + return ret; } else if (RET_FAIL(io_writer_->end_flush_chunk( chunk_writer->get_chunk_statistic()))) { + return ret; + } else { + chunk_writer->destroy(); } } return ret; diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index f7d304b2..099c6467 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -59,6 +59,8 @@ class TsFileWriter { common::CompressionType compression_type); int write_record(const TsRecord &record); int write_tablet(const Tablet &tablet); + int64_t calculate_mem_size_for_all_group(); + int check_memory_size_and_may_flush_chunks(); /* * Flush buffer to disk file, but do not writer file index part. @@ -75,6 +77,7 @@ class TsFileWriter { private: int write_point(storage::ChunkWriter *chunk_writer, int64_t timestamp, const DataPoint &point); + bool check_chunk_group_empty(MeasurementSchemaGroup *chunk_group); int flush_chunk_group(MeasurementSchemaGroup *chunk_group); int write_typed_column(storage::ChunkWriter *chunk_writer, @@ -112,7 +115,10 @@ class TsFileWriter { storage::TsFileIOWriter *io_writer_; // device_name -> MeasurementSchemaGroup std::map<std::string, MeasurementSchemaGroup *> schemas_; - bool start_file_done_; + // record count since last flush + int64_t record_count_since_last_flush_; + // record count for next memory check + int64_t record_count_for_next_mem_check_; bool write_file_created_; };
