This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 6a5f8330 feature: finish multiple flush for c++_tsfile (#110)
6a5f8330 is described below
commit 6a5f83303adc313239546ba1848ed49e18afe691
Author: RkGrit <[email protected]>
AuthorDate: Tue Jul 2 17:01:44 2024 +0800
feature: finish multiple flush for c++_tsfile (#110)
* feature: finish multiple flush for c++_tsfile
* rewrite some code and add some comments
* add multi-flush for write_tablet()
---
cpp/src/common/config/config.h | 2 +
cpp/src/common/global.cc | 2 +
cpp/src/common/tsfile_common.h | 9 +++
cpp/src/encoding/bitpack_encoder.h | 43 ++++++++------
cpp/src/encoding/dictionary_encoder.h | 6 ++
cpp/src/encoding/encoder.h | 8 +++
cpp/src/encoding/gorilla_encoder.h | 6 +-
cpp/src/encoding/plain_encoder.h | 8 +--
cpp/src/encoding/ts2diff_encoder.h | 5 ++
cpp/src/file/tsfile_io_writer.cc | 2 -
cpp/src/file/tsfile_io_writer.h | 4 +-
cpp/src/writer/chunk_writer.cc | 9 +++
cpp/src/writer/chunk_writer.h | 2 +
cpp/src/writer/page_writer.h | 12 ++++
cpp/src/writer/tsfile_writer.cc | 104 ++++++++++++++++++++++++++++------
cpp/src/writer/tsfile_writer.h | 8 ++-
16 files changed, 183 insertions(+), 47 deletions(-)
diff --git a/cpp/src/common/config/config.h b/cpp/src/common/config/config.h
index 679a69b0..d2589229 100644
--- a/cpp/src/common/config/config.h
+++ b/cpp/src/common/config/config.h
@@ -49,6 +49,8 @@ typedef struct ConfigValue {
const char *tsfile_prefix_path_;
TSEncoding time_encoding_type_;
uint32_t memtable_flusher_poll_interval_seconds_;
+ int32_t chunk_group_size_threshold_;
+ int32_t record_count_for_next_mem_check_;
} ConfigValue;
extern void init_config_value();
diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc
index b54d4499..dfbbfc81 100644
--- a/cpp/src/common/global.cc
+++ b/cpp/src/common/global.cc
@@ -52,6 +52,8 @@ void init_config_value() {
g_config_value_.page_writer_max_memory_bytes_ = 128 * 1024; // 128 k
g_config_value_.max_degree_of_index_node_ = 256;
g_config_value_.tsfile_index_bloom_filter_error_percent_ = 0.05;
+ g_config_value_.record_count_for_next_mem_check_ = 100;
+ g_config_value_.chunk_group_size_threshold_ = 128 * 1024 * 1024;
// g_config_value_.tsfile_prefix_path_ = "./data";
g_config_value_.tsfile_prefix_path_ = "";
// g_config_value_.time_encoding_type_ = TS_2DIFF;
diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h
index a1920e25..601d5886 100644
--- a/cpp/src/common/tsfile_common.h
+++ b/cpp/src/common/tsfile_common.h
@@ -86,6 +86,15 @@ struct PageHeader {
}
return ret;
}
+
+ /** max page header size without statistics. */
+ static int estimat_max_page_header_size_without_statistics() {
+ // uncompressedSize, compressedSize
+ // because we use unsigned varInt to encode these two integer, each
+ // unsigned varInt will cost at most 5 bytes
+ return 2 * (4 + 1);
+ }
+
#ifndef NDEBUG
friend std::ostream &operator<<(std::ostream &os, const PageHeader &h) {
os << "{uncompressed_size_=" << h.uncompressed_size_
diff --git a/cpp/src/encoding/bitpack_encoder.h
b/cpp/src/encoding/bitpack_encoder.h
index 85a385bf..708e00a7 100644
--- a/cpp/src/encoding/bitpack_encoder.h
+++ b/cpp/src/encoding/bitpack_encoder.h
@@ -58,13 +58,12 @@ class BitPackEncoder {
packer_ = nullptr;
}
- void destroy() { /* do nothing for BitPackEncoder */
- delete (packer_);
- }
+ void destroy() { delete (packer_); }
void reset() {
num_buffered_values_ = 0;
bitpacked_group_count_ = 0;
+ bit_width_ = 0;
bytes_buffer_.clear();
byte_cache_.reset();
values_.clear();
@@ -74,11 +73,13 @@ class BitPackEncoder {
FORCE_INLINE void encode(int64_t value, common::ByteStream &out) {
values_.push_back(value);
+ int current_bit_width = 32 - number_of_leading_zeros(value);
+ if (current_bit_width > bit_width_) {
+ bit_width_ = current_bit_width;
+ }
}
void encode_flush(common::ByteStream &out) {
- // we get bit width after receiving all data
- bit_width_ = get_int_max_bit_width(values_);
ASSERT(packer_ == nullptr);
packer_ = new IntPacker(bit_width_);
common::SerializationUtil::write_i8(bit_width_, byte_cache_);
@@ -121,19 +122,6 @@ class BitPackEncoder {
common::mem_free(bytes);
}
- int get_int_max_bit_width(std::vector<int64_t> values) {
- // TODO: Optimization - find the maximum value first, and then calcuate
- // the bit width
- int max = 1;
- for (size_t i = 0; i < values.size(); i++) {
- int bitWidth = 64 - number_of_leading_zeros(values[i]);
- if (bitWidth > max) {
- max = bitWidth;
- }
- }
- return max;
- }
-
void flush(common::ByteStream &out) {
int last_bitpacked_num = num_buffered_values_;
if (num_buffered_values_ > 0) {
@@ -167,6 +155,25 @@ class BitPackEncoder {
bytes_buffer_.clear();
bitpacked_group_count_ = 0;
}
+
+ int get_max_byte_size() {
+ if (values_.empty()) {
+ return 0;
+ }
+ int totalValues = values_.size();
+ int fullGroups = totalValues / 8;
+ int remainingValues = totalValues % 8;
+ int bytesPerGroup = (bit_width_ * 8 + 7) / 8;
+ int maxSize = 0;
+ maxSize += fullGroups * bytesPerGroup;
+ if (remainingValues > 0) {
+ maxSize += bytesPerGroup;
+ }
+
+ // Add additional bytes, because each bitpack group has a header of 1
byte and a tail of 1 byte.
+ maxSize += fullGroups * (1 + 1) + (remainingValues > 0 ? (1 + 1) : 0);
+ return maxSize;
+ }
};
} // end namespace storage
diff --git a/cpp/src/encoding/dictionary_encoder.h
b/cpp/src/encoding/dictionary_encoder.h
index 73c6a6f5..9ea3f6f0 100644
--- a/cpp/src/encoding/dictionary_encoder.h
+++ b/cpp/src/encoding/dictionary_encoder.h
@@ -95,6 +95,12 @@ class DictionaryEncoder {
void write_encoded_data(common::ByteStream &out) {
values_encoder_.encode_flush(out);
}
+
+ int get_max_byte_size()
+ {
+ // 4 bytes for storing dictionary size
+ return 4 + map_size_ + values_encoder_.get_max_byte_size();
+ }
};
} // end namespace storage
diff --git a/cpp/src/encoding/encoder.h b/cpp/src/encoding/encoder.h
index 99e5146a..9b52cfd4 100644
--- a/cpp/src/encoding/encoder.h
+++ b/cpp/src/encoding/encoder.h
@@ -38,6 +38,14 @@ class Encoder {
virtual int encode(float value, common::ByteStream &out_stream) = 0;
virtual int encode(double value, common::ByteStream &out_stream) = 0;
virtual int flush(common::ByteStream &out_stream) = 0;
+
+ /**
+ * The maximal possible memory size occupied by current Encoder. This
+ * statistic value doesn't involve OutputStream.
+ *
+ * @return the maximal size of possible memory occupied by current encoder
+ */
+ virtual int get_max_byte_size() = 0;
};
} // end namespace storage
diff --git a/cpp/src/encoding/gorilla_encoder.h
b/cpp/src/encoding/gorilla_encoder.h
index 2ae580cd..47cedf18 100644
--- a/cpp/src/encoding/gorilla_encoder.h
+++ b/cpp/src/encoding/gorilla_encoder.h
@@ -119,7 +119,7 @@ class GorillaEncoder : public Encoder {
}
}
- int get_one_item_max_size();
+ int get_max_byte_size();
void write_first(T value, common::ByteStream &out);
void write_existing_leading(T xor_value, common::ByteStream &out);
void write_new_leading(T xor_value, int leading_zeros, int trailing_zeros,
@@ -145,11 +145,11 @@ class GorillaEncoder : public Encoder {
};
template <>
-FORCE_INLINE int GorillaEncoder<int32_t>::get_one_item_max_size() {
+FORCE_INLINE int GorillaEncoder<int32_t>::get_max_byte_size() {
return INT32_ONE_ITEM_MAX_SIZE;
}
template <>
-FORCE_INLINE int GorillaEncoder<int64_t>::get_one_item_max_size() {
+FORCE_INLINE int GorillaEncoder<int64_t>::get_max_byte_size() {
return INT64_ONE_ITEM_MAX_SIZE;
}
diff --git a/cpp/src/encoding/plain_encoder.h b/cpp/src/encoding/plain_encoder.h
index 556aea97..ab1e06f6 100644
--- a/cpp/src/encoding/plain_encoder.h
+++ b/cpp/src/encoding/plain_encoder.h
@@ -28,10 +28,8 @@ class PlainEncoder : public Encoder {
public:
PlainEncoder() {}
~PlainEncoder() { destroy(); }
- void destroy() { /* do nothing for PlainEncoder */
- }
- void reset() { /* do thing for PlainEncoder */
- }
+ void destroy() { /* do nothing for PlainEncoder */ }
+ void reset() { /* do thing for PlainEncoder */ }
FORCE_INLINE int encode(bool value, common::ByteStream &out_stream) {
return common::SerializationUtil::write_i8(value ? 1 : 0, out_stream);
@@ -57,6 +55,8 @@ class PlainEncoder : public Encoder {
// do nothing for PlainEncoder
return common::E_OK;
}
+
+ int get_max_byte_size() { return 0; }
};
} // end namespace storage
diff --git a/cpp/src/encoding/ts2diff_encoder.h
b/cpp/src/encoding/ts2diff_encoder.h
index 081c7fa7..67d2c83d 100644
--- a/cpp/src/encoding/ts2diff_encoder.h
+++ b/cpp/src/encoding/ts2diff_encoder.h
@@ -112,6 +112,11 @@ class TS2DIFFEncoder : public Encoder {
int flush(common::ByteStream &out_stream);
+ int get_max_byte_size() {
+ // The meaning of 24 is:
index(4)+width(4)+minDeltaBase(8)+firstValue(8)
+ return 24 + write_index_ * 8;
+ }
+
public:
int block_size_;
T *delta_arr_;
diff --git a/cpp/src/file/tsfile_io_writer.cc b/cpp/src/file/tsfile_io_writer.cc
index ac1e1aa1..3880116e 100644
--- a/cpp/src/file/tsfile_io_writer.cc
+++ b/cpp/src/file/tsfile_io_writer.cc
@@ -168,8 +168,6 @@ int TsFileIOWriter::flush_chunk(ByteStream &chunk_data) {
} else if (RET_FAIL(flush_stream_to_file())) {
// log_err("flush stream error, ret=%d", ret);
}
-
- chunk_data.destroy();
return ret;
}
diff --git a/cpp/src/file/tsfile_io_writer.h b/cpp/src/file/tsfile_io_writer.h
index db594a12..092c543f 100644
--- a/cpp/src/file/tsfile_io_writer.h
+++ b/cpp/src/file/tsfile_io_writer.h
@@ -97,8 +97,8 @@ class TsFileIOWriter {
int end_flush_chunk_group();
int end_file();
- FORCE_INLINE std::vector<TimeseriesTimeIndexEntry>
- &get_ts_time_index_vector() {
+ FORCE_INLINE std::vector<TimeseriesTimeIndexEntry> &
+ get_ts_time_index_vector() {
return ts_time_index_vector_;
}
FORCE_INLINE std::string get_file_path() { return file_->get_file_path(); }
diff --git a/cpp/src/writer/chunk_writer.cc b/cpp/src/writer/chunk_writer.cc
index 51c78af5..a283fb96 100644
--- a/cpp/src/writer/chunk_writer.cc
+++ b/cpp/src/writer/chunk_writer.cc
@@ -63,6 +63,7 @@ void ChunkWriter::destroy() {
}
chunk_data_.destroy();
chunk_header_.reset();
+ num_of_pages_ = 0;
}
int ChunkWriter::seal_cur_page(bool end_chunk) {
@@ -151,4 +152,12 @@ int ChunkWriter::end_encode_chunk() {
return ret;
}
+
+int64_t ChunkWriter::estimate_max_series_mem_size(){
+ return chunk_data_.total_size()
+ + page_writer_.estimate_max_mem_size()
+ + PageHeader::estimat_max_page_header_size_without_statistics()
+ + get_typed_statistic_sizeof(page_writer_.get_statistic()->get_type());
+}
+
} // end namespace storage
diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h
index c181ce98..54d51f54 100644
--- a/cpp/src/writer/chunk_writer.h
+++ b/cpp/src/writer/chunk_writer.h
@@ -93,6 +93,8 @@ class ChunkWriter {
return false;
}
+ int64_t estimate_max_series_mem_size();
+
private:
FORCE_INLINE bool is_cur_page_full() const {
// FIXME
diff --git a/cpp/src/writer/page_writer.h b/cpp/src/writer/page_writer.h
index 98b25179..6800c7ea 100644
--- a/cpp/src/writer/page_writer.h
+++ b/cpp/src/writer/page_writer.h
@@ -134,6 +134,18 @@ class PageWriter {
FORCE_INLINE uint32_t get_page_memory_size() const {
return time_out_stream_.total_size() + value_out_stream_.total_size();
}
+ /**
+ * calculate max possible memory size it occupies, including time
+ * outputStream and value outputStream, because size outputStream is never
+ * used until flushing.
+ *
+ * @return allocated size in time, value and outputStream
+ */
+ FORCE_INLINE uint32_t estimate_max_mem_size() const {
+ return time_out_stream_.total_size() + value_out_stream_.total_size() +
+ time_encoder_->get_max_byte_size() +
+ value_encoder_->get_max_byte_size();
+ }
int write_to_chunk(common::ByteStream &pages_data, bool write_header,
bool write_statistic, bool write_data_to_chunk_data);
FORCE_INLINE common::ByteStream &get_time_data() {
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 37fe6098..7d5db9f0 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -60,10 +60,10 @@ TsFileWriter::TsFileWriter()
: write_file_(nullptr),
io_writer_(nullptr),
schemas_(),
- start_file_done_(false),
- write_file_created_(false) {
- // do nothing.
-}
+ record_count_since_last_flush_(0),
+ record_count_for_next_mem_check_(
+ g_config_value_.record_count_for_next_mem_check_),
+ write_file_created_(false) {}
TsFileWriter::~TsFileWriter() { destroy(); }
@@ -94,6 +94,7 @@ void TsFileWriter::destroy() {
delete dev_iter->second;
}
schemas_.clear();
+ record_count_since_last_flush_ = 0;
}
int TsFileWriter::init(WriteFile *write_file) {
@@ -124,6 +125,12 @@ int TsFileWriter::open(const std::string &file_path, int
flags, mode_t mode) {
if (RET_FAIL(write_file_->create(file_path, flags, mode))) {
} else {
io_writer_->init(write_file_);
+ if (RET_FAIL(io_writer_->start_file())) {
+ return ret;
+ }
+#if DEBUG_SE
+ std::cout << "finish writing magic code" << std::endl;
+#endif
}
return ret;
}
@@ -202,9 +209,7 @@ struct MeasurementNamesFromTablet {
template <typename MeasurementNamesGetter>
int TsFileWriter::do_check_schema(const std::string &device_name,
MeasurementNamesGetter &measurement_names,
- SimpleVector<ChunkWriter *> &chunk_writers)
-// std::vector<ChunkWriter*> &chunk_writers)
-{
+ SimpleVector<ChunkWriter *> &chunk_writers) {
int ret = E_OK;
DeviceSchemaIter dev_it = schemas_.find(device_name);
MeasurementSchemaGroup *device_schema = NULL;
@@ -220,8 +225,8 @@ int TsFileWriter::do_check_schema(const std::string
&device_name,
if (UNLIKELY(ms_iter == msm.end())) {
chunk_writers.push_back(NULL);
} else {
- // Here we may check data_type against ms_iter. But in Java
- // libtsfile, no check here.
+ // In Java we will check data_type. But in C++, no check here.
+ // Because checks are performed at the chunk layer and page layer
MeasurementSchema *ms = ms_iter->second;
if (IS_NULL(ms->chunk_writer_)) {
ms->chunk_writer_ = new ChunkWriter;
@@ -242,6 +247,43 @@ int TsFileWriter::do_check_schema(const std::string
&device_name,
return ret;
}
+int64_t TsFileWriter::calculate_mem_size_for_all_group() {
+ int64_t mem_total_size = 0;
+ DeviceSchemaIter device_iter;
+ for (device_iter = schemas_.begin(); device_iter != schemas_.end();
+ device_iter++) {
+ MeasurementSchemaGroup *chunk_group = device_iter->second;
+ MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
+ for (MeasurementSchemaMapIter ms_iter = map.begin();
+ ms_iter != map.end(); ms_iter++) {
+ MeasurementSchema *m_schema = ms_iter->second;
+ ChunkWriter *&chunk_writer = m_schema->chunk_writer_;
+ if (chunk_writer != NULL) {
+ mem_total_size += chunk_writer->estimate_max_series_mem_size();
+ }
+ }
+ }
+ return mem_total_size;
+}
+
+/**
+ * check occupied memory size, if it exceeds the chunkGroupSize threshold,
flush
+ * them to given OutputStream.
+ */
+int TsFileWriter::check_memory_size_and_may_flush_chunks() {
+ int ret = E_OK;
+ if (record_count_since_last_flush_ >= record_count_for_next_mem_check_) {
+ int64_t mem_size = calculate_mem_size_for_all_group();
+ record_count_for_next_mem_check_ =
+ record_count_since_last_flush_ *
+ common::g_config_value_.chunk_group_size_threshold_ / mem_size;
+ if (mem_size > common::g_config_value_.chunk_group_size_threshold_) {
+ ret = flush();
+ }
+ }
+ return ret;
+}
+
int TsFileWriter::write_record(const TsRecord &record) {
int ret = E_OK;
// std::vector<ChunkWriter*> chunk_writers;
@@ -261,6 +303,9 @@ int TsFileWriter::write_record(const TsRecord &record) {
// ignore point writer failure
write_point(chunk_writer, record.timestamp_, record.points_[c]);
}
+
+ record_count_since_last_flush_++;
+ ret = check_memory_size_and_may_flush_chunks();
return ret;
}
@@ -303,6 +348,9 @@ int TsFileWriter::write_tablet(const Tablet &tablet) {
// ignore writer failure
write_column(chunk_writer, tablet, c);
}
+
+ record_count_since_last_flush_ += tablet.max_rows_;
+ ret = check_memory_size_and_may_flush_chunks();
return ret;
}
@@ -381,27 +429,44 @@ int TsFileWriter::write_typed_column(ChunkWriter
*chunk_writer,
// TODO make sure ret is meaningful to SDK user
int TsFileWriter::flush() {
int ret = E_OK;
- if (!start_file_done_) {
- if (RET_FAIL(io_writer_->start_file())) {
- return ret;
- }
- start_file_done_ = true;
- }
- std::cout << "finish writing magic code" << std::endl;
/* since @schemas_ used std::map which is rbtree underlying,
so map itself is ordered by device name. */
std::map<std::string, MeasurementSchemaGroup *>::iterator device_iter;
for (device_iter = schemas_.begin(); device_iter != schemas_.end();
- device_iter++) { // cppcheck-suppress postfixOperator
+ device_iter++) {
+ if (check_chunk_group_empty(device_iter->second)) {
+ continue;
+ }
+
if (RET_FAIL(io_writer_->start_flush_chunk_group(device_iter->first)))
{
+ return ret;
} else if (RET_FAIL(flush_chunk_group(device_iter->second))) {
+ return ret;
} else if (RET_FAIL(io_writer_->end_flush_chunk_group())) {
+ return ret;
}
}
+ record_count_since_last_flush_ = 0;
return ret;
}
+bool TsFileWriter::check_chunk_group_empty(
+ MeasurementSchemaGroup *chunk_group) {
+ MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
+ for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
+ ms_iter++) {
+ MeasurementSchema *m_schema = ms_iter->second;
+ if (m_schema->chunk_writer_ != NULL &&
+ m_schema->chunk_writer_->num_of_pages() > 0) {
+ // first condition is to avoid first flush empty chunk group
+ // second condition is to avoid repeated flush
+ return false;
+ }
+ }
+ return true;
+}
+
int TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group) {
int ret = E_OK;
MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
@@ -415,10 +480,15 @@ int
TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group) {
m_schema->measurement_name_, m_schema->data_type_,
m_schema->encoding_, m_schema->compression_type_,
chunk_writer->num_of_pages()))) {
+ return ret;
} else if (RET_FAIL(io_writer_->flush_chunk(
chunk_writer->get_chunk_data()))) {
+ return ret;
} else if (RET_FAIL(io_writer_->end_flush_chunk(
chunk_writer->get_chunk_statistic()))) {
+ return ret;
+ } else {
+ chunk_writer->destroy();
}
}
return ret;
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index f7d304b2..099c6467 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -59,6 +59,8 @@ class TsFileWriter {
common::CompressionType compression_type);
int write_record(const TsRecord &record);
int write_tablet(const Tablet &tablet);
+ int64_t calculate_mem_size_for_all_group();
+ int check_memory_size_and_may_flush_chunks();
/*
* Flush buffer to disk file, but do not writer file index part.
@@ -75,6 +77,7 @@ class TsFileWriter {
private:
int write_point(storage::ChunkWriter *chunk_writer, int64_t timestamp,
const DataPoint &point);
+ bool check_chunk_group_empty(MeasurementSchemaGroup *chunk_group);
int flush_chunk_group(MeasurementSchemaGroup *chunk_group);
int write_typed_column(storage::ChunkWriter *chunk_writer,
@@ -112,7 +115,10 @@ class TsFileWriter {
storage::TsFileIOWriter *io_writer_;
// device_name -> MeasurementSchemaGroup
std::map<std::string, MeasurementSchemaGroup *> schemas_;
- bool start_file_done_;
+ // record count since last flush
+ int64_t record_count_since_last_flush_;
+ // record count for next memory check
+ int64_t record_count_for_next_mem_check_;
bool write_file_created_;
};