This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 53dd0a13a5 [FIX](map)fix map batch append data with right
next_array_item_rowid (#23779)
53dd0a13a5 is described below
commit 53dd0a13a57f69254c25b12daf9e3d6cdb41cc5c
Author: Kang <[email protected]>
AuthorDate: Mon Sep 4 23:40:07 2023 +0800
[FIX](map)fix map batch append data with right next_array_item_rowid
(#23779)
---
be/src/olap/rowset/segment_v2/column_writer.cpp | 53 ++++++++++++++++++++-----
be/src/olap/rowset/segment_v2/column_writer.h | 31 ++++++++++++---
be/src/vec/olap/olap_data_convertor.cpp | 8 +++-
3 files changed, 75 insertions(+), 17 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp
b/be/src/olap/rowset/segment_v2/column_writer.cpp
index ec2baa10f0..04431142fd 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -314,7 +314,7 @@ Status ColumnWriter::create(const ColumnWriterOptions&
opts, const TabletColumn*
length_column.set_index_length(-1); // no short key index
std::unique_ptr<Field>
bigint_field(FieldFactory::create(length_column));
auto* length_writer =
- new ScalarColumnWriter(length_options,
std::move(bigint_field), file_writer);
+ new OffsetColumnWriter(length_options,
std::move(bigint_field), file_writer);
// create null writer
if (opts.meta->is_nullable()) {
@@ -731,6 +731,48 @@ Status ScalarColumnWriter::finish_current_page() {
////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
+// offset column writer
+////////////////////////////////////////////////////////////////////////////////
+
+OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts,
+ std::unique_ptr<Field> field,
io::FileWriter* file_writer)
+ : ScalarColumnWriter(opts, std::move(field), file_writer) {
+ // now we only explain data in offset column as uint64
+ DCHECK(field->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT);
+}
+
+OffsetColumnWriter::~OffsetColumnWriter() = default;
+
+Status OffsetColumnWriter::init() {
+ RETURN_IF_ERROR(ScalarColumnWriter::init());
+ register_flush_page_callback(this);
+ _next_offset = 0;
+ return Status::OK();
+}
+
+Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
+ size_t remaining = num_rows;
+ while (remaining > 0) {
+ size_t num_written = remaining;
+ RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
+ // _next_offset after append_data_in_current_page is the offset of
next data, which will used in finish_current_page() to set
next_array_item_ordinal
+ _next_offset = *(const uint64_t*)(*ptr);
+ remaining -= num_written;
+
+ if (_page_builder->is_page_full()) {
+ // get next data for next array_item_rowid
+ RETURN_IF_ERROR(finish_current_page());
+ }
+ }
+ return Status::OK();
+}
+
+Status OffsetColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
+ footer->set_next_array_item_ordinal(_next_offset);
+ return Status::OK();
+}
+
StructColumnWriter::StructColumnWriter(
const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
ScalarColumnWriter* null_writer,
@@ -1008,7 +1050,7 @@ Status ArrayColumnWriter::finish_current_page() {
/// ============================= MapColumnWriter =====================////
MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts,
std::unique_ptr<Field> field,
- ScalarColumnWriter* null_writer,
ScalarColumnWriter* offset_writer,
+ ScalarColumnWriter* null_writer,
OffsetColumnWriter* offset_writer,
std::vector<std::unique_ptr<ColumnWriter>>&
kv_writers)
: ColumnWriter(std::move(field), opts.meta->is_nullable()),
_opts(opts) {
CHECK_EQ(kv_writers.size(), 2);
@@ -1028,7 +1070,6 @@ Status MapColumnWriter::init() {
}
// here register_flush_page_callback to call this.put_extra_info_in_page()
// when finish cur data page
- _offsets_writer->register_flush_page_callback(this);
for (auto& sub_writer : _kv_writers) {
RETURN_IF_ERROR(sub_writer->init());
}
@@ -1138,12 +1179,6 @@ Status MapColumnWriter::finish_current_page() {
return Status::NotSupported("map writer has no data, can not
finish_current_page");
}
-// write this value for column reader to read according offsets
-Status MapColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
- footer->set_next_array_item_ordinal(_kv_writers[0]->get_next_rowid());
- return Status::OK();
-}
-
Status MapColumnWriter::write_inverted_index() {
if (_opts.inverted_index) {
return _inverted_index_builder->finish();
diff --git a/be/src/olap/rowset/segment_v2/column_writer.h
b/be/src/olap/rowset/segment_v2/column_writer.h
index b5aabd4e3a..dfda60b53f 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.h
+++ b/be/src/olap/rowset/segment_v2/column_writer.h
@@ -171,7 +171,7 @@ public:
// Because some columns would be stored in a file, we should wait
// until all columns has been finished, and then data can be written
// to file
-class ScalarColumnWriter final : public ColumnWriter {
+class ScalarColumnWriter : public ColumnWriter {
public:
ScalarColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field>
field,
io::FileWriter* file_writer);
@@ -208,6 +208,7 @@ public:
Status append_data_in_current_page(const uint8_t* ptr, size_t*
num_written);
friend class ArrayColumnWriter;
+ friend class OffsetColumnWriter;
private:
std::unique_ptr<PageBuilder> _page_builder;
@@ -276,6 +277,26 @@ private:
FlushPageCallback* _new_page_callback = nullptr;
};
+// offsetColumnWriter is used column which has offset column, like array, map.
+// column type is only uint64 and should response for whole column value
[start, end], end will set
+// in footer.next_array_item_ordinal which in finish_cur_page() callback
put_extra_info_in_page()
+class OffsetColumnWriter final : public ScalarColumnWriter, FlushPageCallback {
+public:
+ OffsetColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field>
field,
+ io::FileWriter* file_writer);
+
+ ~OffsetColumnWriter() override;
+
+ Status init() override;
+
+ Status append_data(const uint8_t** ptr, size_t num_rows) override;
+
+private:
+ Status put_extra_info_in_page(DataPageFooterPB* footer) override;
+
+ uint64_t _next_offset;
+};
+
class StructColumnWriter final : public ColumnWriter {
public:
explicit StructColumnWriter(const ColumnWriterOptions& opts,
std::unique_ptr<Field> field,
@@ -385,10 +406,10 @@ private:
ColumnWriterOptions _opts;
};
-class MapColumnWriter final : public ColumnWriter, public FlushPageCallback {
+class MapColumnWriter final : public ColumnWriter {
public:
explicit MapColumnWriter(const ColumnWriterOptions& opts,
std::unique_ptr<Field> field,
- ScalarColumnWriter* null_writer,
ScalarColumnWriter* offsets_writer,
+ ScalarColumnWriter* null_writer,
OffsetColumnWriter* offsets_writer,
std::vector<std::unique_ptr<ColumnWriter>>&
_kv_writers);
~MapColumnWriter() override = default;
@@ -432,12 +453,10 @@ public:
ordinal_t get_next_rowid() const override { return
_offsets_writer->get_next_rowid(); }
private:
- Status put_extra_info_in_page(DataPageFooterPB* header) override;
-
std::vector<std::unique_ptr<ColumnWriter>> _kv_writers;
// we need null writer to make sure a row is null or not
std::unique_ptr<ScalarColumnWriter> _null_writer;
- std::unique_ptr<ScalarColumnWriter> _offsets_writer;
+ std::unique_ptr<OffsetColumnWriter> _offsets_writer;
std::unique_ptr<InvertedIndexColumnWriter> _inverted_index_builder;
ColumnWriterOptions _opts;
};
diff --git a/be/src/vec/olap/olap_data_convertor.cpp
b/be/src/vec/olap/olap_data_convertor.cpp
index 181f1cd477..d339accc7b 100644
--- a/be/src/vec/olap/olap_data_convertor.cpp
+++ b/be/src/vec/olap/olap_data_convertor.cpp
@@ -943,6 +943,8 @@ Status
OlapBlockDataConvertor::OlapColumnDataConvertorArray::convert_to_olap(
auto elem_size = end_offset - start_offset;
_offsets.clear();
+ // we need all offsets, so reserve num_rows + 1 to make sure last offset
can be got in offset column,
+ // instead of according to nested item column
_offsets.reserve(_num_rows + 1);
for (int i = 0; i <= _num_rows; ++i) {
_offsets.push_back(column_array->offset_at(i + _row_pos) -
start_offset + _base_offset);
@@ -1011,8 +1013,10 @@ Status
OlapBlockDataConvertor::OlapColumnDataConvertorMap::convert_to_olap(
auto elem_size = end_offset - start_offset;
_offsets.clear();
- _offsets.reserve(_num_rows);
- for (int i = 0; i < _num_rows; ++i) {
+ // we need all offsets, so reserve num_rows + 1 to make sure last offset
can be got in offset column,
+ // instead of according to nested item column
+ _offsets.reserve(_num_rows + 1);
+ for (int i = 0; i <= _num_rows; ++i) {
_offsets.push_back(column_map->offset_at(i + _row_pos) - start_offset
+ _base_offset);
}
_base_offset += elem_size;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]