github-actions[bot] commented on code in PR #16371:
URL: https://github.com/apache/doris/pull/16371#discussion_r1094196033
##########
be/src/olap/schema_change.cpp:
##########
@@ -586,6 +592,248 @@ Status
VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_
return Status::OK();
}
+SchemaChangeForInvertedIndex::SchemaChangeForInvertedIndex(
+ const std::vector<TOlapTableIndex>& alter_inverted_indexs,
+ const TabletSchemaSPtr& tablet_schema)
+ : SchemaChange(),
Review Comment:
warning: initializer for base class 'doris::SchemaChange' is redundant
[readability-redundant-member-init]
```suggestion
: , _alter_inverted_indexs(alter_inverted_indexs),
_tablet_schema(tablet_schema) {
```
##########
be/src/olap/schema_change.cpp:
##########
@@ -586,6 +592,248 @@
return Status::OK();
}
+SchemaChangeForInvertedIndex::SchemaChangeForInvertedIndex(
+ const std::vector<TOlapTableIndex>& alter_inverted_indexs,
+ const TabletSchemaSPtr& tablet_schema)
+ : SchemaChange(),
+ _alter_inverted_indexs(alter_inverted_indexs),
+ _tablet_schema(tablet_schema) {}
+
+SchemaChangeForInvertedIndex::~SchemaChangeForInvertedIndex() {
+ VLOG_NOTICE << "~SchemaChangeForInvertedIndex()";
+ _inverted_index_builders.clear();
+ _index_metas.clear();
+}
+
+Status SchemaChangeForInvertedIndex::process(RowsetReaderSharedPtr
rowset_reader,
+ RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet,
+ TabletSharedPtr base_tablet,
+ TabletSchemaSPtr
base_tablet_schema) {
+ Status res = Status::OK();
+ if (rowset_reader->rowset()->empty() ||
rowset_reader->rowset()->num_rows() == 0) {
+ return Status::OK();
+ }
+
+ std::vector<ColumnId> return_columns;
+ for (auto& inverted_index : _alter_inverted_indexs) {
+ DCHECK_EQ(inverted_index.columns.size(), 1);
+ auto column_name = inverted_index.columns[0];
+ auto idx = _tablet_schema->field_index(column_name);
+ return_columns.emplace_back(idx);
+ }
+
+ // create inverted index writer
+ auto rowset_meta = rowset_reader->rowset()->rowset_meta();
+ std::string segment_dir = base_tablet->tablet_path();
+ auto fs = rowset_meta->fs();
+ for (auto i = 0; i < rowset_meta->num_segments(); ++i) {
+ std::string segment_filename =
+ fmt::format("{}_{}.dat", rowset_meta->rowset_id().to_string(),
i);
+ for (auto& inverted_index : _alter_inverted_indexs) {
+ DCHECK_EQ(inverted_index.columns.size(), 1);
+ auto column_name = inverted_index.columns[0];
+ auto column = _tablet_schema->column(column_name);
+ auto index_id = inverted_index.index_id;
+
+ std::unique_ptr<Field> field(FieldFactory::create(column));
+ _index_metas.emplace_back(new TabletIndex());
+ _index_metas.back()->init_from_thrift(inverted_index,
*_tablet_schema);
+ std::unique_ptr<segment_v2::InvertedIndexColumnWriter>
inverted_index_builder;
+ try {
+ RETURN_IF_ERROR(segment_v2::InvertedIndexColumnWriter::create(
+ field.get(), &inverted_index_builder,
segment_filename, segment_dir,
+ _index_metas.back().get(), fs));
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "CLuceneError occured: " << e.what();
+ return Status::Error<IO_ERROR>();
+ }
+
+ if (inverted_index_builder) {
+ std::string writer_sign = fmt::format("{}_{}", i, index_id);
+ _inverted_index_builders.insert(
+ std::make_pair(writer_sign,
std::move(inverted_index_builder)));
+ }
+ }
+ }
+
+ SegmentCacheHandle segment_cache_handle;
+ // load segments
+ RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
+ std::static_pointer_cast<BetaRowset>(rowset_reader->rowset()),
&segment_cache_handle,
+ false));
+
+ // create iterator for each segment
+ StorageReadOptions read_options;
+ OlapReaderStatistics stats;
+ read_options.stats = &stats;
+ read_options.tablet_schema = _tablet_schema;
+ std::unique_ptr<Schema> schema =
+ std::make_unique<Schema>(_tablet_schema->columns(),
return_columns);
+ for (auto& seg_ptr : segment_cache_handle.get_segments()) {
+ std::unique_ptr<RowwiseIterator> iter;
+ res = seg_ptr->new_iterator(*schema, read_options, &iter);
+ if (!res.ok()) {
+ LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
+ << "]: " << res.to_string();
+ return Status::Error<ROWSET_READER_INIT>();
+ }
+
+ std::shared_ptr<vectorized::Block> block =
+
std::make_shared<vectorized::Block>(_tablet_schema->create_block(return_columns));
+ do {
+ block->clear_column_data();
+ res = iter->next_batch(block.get());
+ if (!res.ok()) {
+ if (res.is<END_OF_FILE>()) {
+ break;
+ }
+ RETURN_NOT_OK_LOG(
+ res, "failed to read next block when schema change for
inverted index.");
+ }
+
+ // copy block
+ auto ref_block = *block;
+
+ // write inverted index
+ if (_write_inverted_index(iter->data_id(), &ref_block) !=
Status::OK()) {
+ res = Status::Error<SCHEMA_CHANGE_INFO_INVALID>();
+ LOG(WARNING) << "failed to write block.";
+ return res;
+ }
+ } while (block->rows() != 0);
+ }
+
+ // finish write inverted index, flush data to compound file
+ for (auto i = 0; i < rowset_meta->num_segments(); ++i) {
+ for (auto& inverted_index : _alter_inverted_indexs) {
+ DCHECK_EQ(inverted_index.columns.size(), 1);
+ auto column_name = inverted_index.columns[0];
+ auto column = _tablet_schema->column(column_name);
+ auto index_id = inverted_index.index_id;
+ std::string writer_sign = fmt::format("{}_{}", i, index_id);
+ try {
+ if (_inverted_index_builders[writer_sign]) {
+ _inverted_index_builders[writer_sign]->finish();
+ }
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "CLuceneError occured: " << e.what();
+ return Status::Error<IO_ERROR>();
+ }
+ }
+ }
+ _inverted_index_builders.clear();
+ _index_metas.clear();
+
+ LOG(INFO) << "all row nums. source_rows=" <<
rowset_reader->rowset()->num_rows();
+ return res;
+}
+
+Status SchemaChangeForInvertedIndex::_add_nullable(const std::string&
column_name,
+ const std::string&
index_writer_sign,
+ Field* field, const
uint8_t* null_map,
+ const uint8_t** ptr, size_t
num_rows) {
Review Comment:
warning: statement should be inside braces
[readability-braces-around-statements]
```suggestion
if (null_map[offset] == null_map[i]) {
```
be/src/olap/schema_change.cpp:736:
```diff
- else
+ } else
```
##########
be/src/olap/schema_change.h:
##########
@@ -172,11 +177,42 @@ class VSchemaChangeWithSorting : public SchemaChange {
std::unique_ptr<MemTracker> _mem_tracker;
};
+class SchemaChangeForInvertedIndex : public SchemaChange {
+public:
+ explicit SchemaChangeForInvertedIndex(const std::vector<TOlapTableIndex>&
alter_inverted_indexs,
+ const TabletSchemaSPtr&
tablet_schema);
+ virtual ~SchemaChangeForInvertedIndex();
+
Review Comment:
warning: prefer using 'override' or (rarely) 'final' instead of 'virtual'
[modernize-use-override]
```suggestion
~SchemaChangeForInvertedIndex() override;
```
##########
be/src/olap/schema_change.h:
##########
@@ -172,11 +177,42 @@
std::unique_ptr<MemTracker> _mem_tracker;
};
+class SchemaChangeForInvertedIndex : public SchemaChange {
+public:
+ explicit SchemaChangeForInvertedIndex(const std::vector<TOlapTableIndex>&
alter_inverted_indexs,
+ const TabletSchemaSPtr&
tablet_schema);
+ virtual ~SchemaChangeForInvertedIndex();
+
+ virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter*
rowset_writer,
+ TabletSharedPtr new_tablet, TabletSharedPtr
base_tablet,
+ TabletSchemaSPtr base_tablet_schema) override;
+
+private:
+ DISALLOW_COPY_AND_ASSIGN(SchemaChangeForInvertedIndex);
+ Status _write_inverted_index(int32_t segment_idx, vectorized::Block*
block);
+ Status _add_data(const std::string& column_name, const std::string&
index_writer_sign,
+ Field* field, const uint8_t** ptr, size_t num_rows);
+ Status _add_nullable(const std::string& column_name, const std::string&
index_writer_sign,
+ Field* field, const uint8_t* null_map, const
uint8_t** ptr,
+ size_t num_rows);
+
+private:
+ std::vector<TOlapTableIndex> _alter_inverted_indexs;
+ TabletSchemaSPtr _tablet_schema;
+
+ // "segment_id_unique_id" -> InvertedIndexColumnWriter
+ std::unordered_map<std::string,
std::unique_ptr<segment_v2::InvertedIndexColumnWriter>>
+ _inverted_index_builders;
+ std::vector<std::unique_ptr<TabletIndex>> _index_metas;
Review Comment:
warning: redundant access specifier has the same accessibility as the
previous access specifier [readability-redundant-access-specifiers]
```suggestion
```
**be/src/olap/schema_change.h:189:** previously declared here
```cpp
private:
^
```
##########
be/src/olap/schema_change.cpp:
##########
@@ -586,6 +592,248 @@
return Status::OK();
}
+SchemaChangeForInvertedIndex::SchemaChangeForInvertedIndex(
+ const std::vector<TOlapTableIndex>& alter_inverted_indexs,
+ const TabletSchemaSPtr& tablet_schema)
+ : SchemaChange(),
+ _alter_inverted_indexs(alter_inverted_indexs),
+ _tablet_schema(tablet_schema) {}
+
+SchemaChangeForInvertedIndex::~SchemaChangeForInvertedIndex() {
+ VLOG_NOTICE << "~SchemaChangeForInvertedIndex()";
+ _inverted_index_builders.clear();
+ _index_metas.clear();
+}
+
+Status SchemaChangeForInvertedIndex::process(RowsetReaderSharedPtr
rowset_reader,
+ RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet,
+ TabletSharedPtr base_tablet,
+ TabletSchemaSPtr
base_tablet_schema) {
+ Status res = Status::OK();
+ if (rowset_reader->rowset()->empty() ||
rowset_reader->rowset()->num_rows() == 0) {
+ return Status::OK();
+ }
+
+ std::vector<ColumnId> return_columns;
+ for (auto& inverted_index : _alter_inverted_indexs) {
+ DCHECK_EQ(inverted_index.columns.size(), 1);
+ auto column_name = inverted_index.columns[0];
+ auto idx = _tablet_schema->field_index(column_name);
+ return_columns.emplace_back(idx);
+ }
+
+ // create inverted index writer
+ auto rowset_meta = rowset_reader->rowset()->rowset_meta();
+ std::string segment_dir = base_tablet->tablet_path();
+ auto fs = rowset_meta->fs();
+ for (auto i = 0; i < rowset_meta->num_segments(); ++i) {
+ std::string segment_filename =
+ fmt::format("{}_{}.dat", rowset_meta->rowset_id().to_string(),
i);
+ for (auto& inverted_index : _alter_inverted_indexs) {
+ DCHECK_EQ(inverted_index.columns.size(), 1);
+ auto column_name = inverted_index.columns[0];
+ auto column = _tablet_schema->column(column_name);
+ auto index_id = inverted_index.index_id;
+
+ std::unique_ptr<Field> field(FieldFactory::create(column));
+ _index_metas.emplace_back(new TabletIndex());
+ _index_metas.back()->init_from_thrift(inverted_index,
*_tablet_schema);
+ std::unique_ptr<segment_v2::InvertedIndexColumnWriter>
inverted_index_builder;
+ try {
+ RETURN_IF_ERROR(segment_v2::InvertedIndexColumnWriter::create(
+ field.get(), &inverted_index_builder,
segment_filename, segment_dir,
+ _index_metas.back().get(), fs));
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "CLuceneError occured: " << e.what();
+ return Status::Error<IO_ERROR>();
+ }
+
+ if (inverted_index_builder) {
+ std::string writer_sign = fmt::format("{}_{}", i, index_id);
+ _inverted_index_builders.insert(
+ std::make_pair(writer_sign,
std::move(inverted_index_builder)));
+ }
+ }
+ }
+
+ SegmentCacheHandle segment_cache_handle;
+ // load segments
+ RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
+ std::static_pointer_cast<BetaRowset>(rowset_reader->rowset()),
&segment_cache_handle,
+ false));
+
+ // create iterator for each segment
+ StorageReadOptions read_options;
+ OlapReaderStatistics stats;
+ read_options.stats = &stats;
+ read_options.tablet_schema = _tablet_schema;
+ std::unique_ptr<Schema> schema =
+ std::make_unique<Schema>(_tablet_schema->columns(),
return_columns);
+ for (auto& seg_ptr : segment_cache_handle.get_segments()) {
+ std::unique_ptr<RowwiseIterator> iter;
+ res = seg_ptr->new_iterator(*schema, read_options, &iter);
+ if (!res.ok()) {
+ LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
+ << "]: " << res.to_string();
+ return Status::Error<ROWSET_READER_INIT>();
+ }
+
+ std::shared_ptr<vectorized::Block> block =
+
std::make_shared<vectorized::Block>(_tablet_schema->create_block(return_columns));
+ do {
+ block->clear_column_data();
+ res = iter->next_batch(block.get());
+ if (!res.ok()) {
+ if (res.is<END_OF_FILE>()) {
+ break;
+ }
+ RETURN_NOT_OK_LOG(
+ res, "failed to read next block when schema change for
inverted index.");
+ }
+
+ // copy block
+ auto ref_block = *block;
+
+ // write inverted index
+ if (_write_inverted_index(iter->data_id(), &ref_block) !=
Status::OK()) {
+ res = Status::Error<SCHEMA_CHANGE_INFO_INVALID>();
+ LOG(WARNING) << "failed to write block.";
+ return res;
+ }
+ } while (block->rows() != 0);
+ }
+
+ // finish write inverted index, flush data to compound file
+ for (auto i = 0; i < rowset_meta->num_segments(); ++i) {
+ for (auto& inverted_index : _alter_inverted_indexs) {
+ DCHECK_EQ(inverted_index.columns.size(), 1);
+ auto column_name = inverted_index.columns[0];
+ auto column = _tablet_schema->column(column_name);
+ auto index_id = inverted_index.index_id;
+ std::string writer_sign = fmt::format("{}_{}", i, index_id);
+ try {
+ if (_inverted_index_builders[writer_sign]) {
+ _inverted_index_builders[writer_sign]->finish();
+ }
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "CLuceneError occured: " << e.what();
+ return Status::Error<IO_ERROR>();
+ }
+ }
+ }
+ _inverted_index_builders.clear();
+ _index_metas.clear();
+
+ LOG(INFO) << "all row nums. source_rows=" <<
rowset_reader->rowset()->num_rows();
+ return res;
+}
+
+Status SchemaChangeForInvertedIndex::_add_nullable(const std::string&
column_name,
+ const std::string&
index_writer_sign,
+ Field* field, const
uint8_t* null_map,
+ const uint8_t** ptr, size_t
num_rows) {
+ size_t offset = 0;
+ auto next_run_step = [&]() {
+ size_t step = 1;
Review Comment:
warning: statement should be inside braces
[readability-braces-around-statements]
```suggestion
else {
break;
}
```
##########
be/src/olap/task/engine_alter_tablet_task.h:
##########
@@ -39,4 +39,18 @@
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask
+class EngineAlterInvertedIndexTask : public EngineTask {
+public:
+ virtual Status execute();
+
+public:
Review Comment:
warning: redundant access specifier has the same accessibility as the
previous access specifier [readability-redundant-access-specifiers]
```suggestion
```
**be/src/olap/task/engine_alter_tablet_task.h:42:** previously declared here
```cpp
public:
^
```
##########
be/src/olap/tablet_schema.cpp:
##########
@@ -492,6 +492,33 @@ void TabletIndex::init_from_thrift(const TOlapTableIndex&
index,
}
}
+void TabletIndex::init_from_thrift(const TOlapTableIndex& index,
+ const std::vector<int32_t>& column_uids) {
+ _index_id = index.index_id;
+ _index_name = index.index_name;
Review Comment:
warning: std::move of the const variable 'column_uids' has no effect; remove
std::move() or make the variable non-const [performance-move-const-arg]
```suggestion
_col_unique_ids = column_uids;
```
##########
be/src/olap/schema_change.h:
##########
@@ -172,11 +177,42 @@
std::unique_ptr<MemTracker> _mem_tracker;
};
+class SchemaChangeForInvertedIndex : public SchemaChange {
+public:
+ explicit SchemaChangeForInvertedIndex(const std::vector<TOlapTableIndex>&
alter_inverted_indexs,
+ const TabletSchemaSPtr&
tablet_schema);
+ virtual ~SchemaChangeForInvertedIndex();
+
+ virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter*
rowset_writer,
+ TabletSharedPtr new_tablet, TabletSharedPtr
base_tablet,
Review Comment:
warning: 'virtual' is redundant since the function is already declared
'override' [modernize-use-override]
```suggestion
Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter*
rowset_writer,
```
##########
be/src/olap/task/engine_alter_tablet_task.h:
##########
@@ -39,4 +39,18 @@ class EngineAlterTabletTask : public EngineTask {
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask
+class EngineAlterInvertedIndexTask : public EngineTask {
+public:
+ virtual Status execute();
Review Comment:
warning: prefer using 'override' or (rarely) 'final' instead of 'virtual'
[modernize-use-override]
```suggestion
Status execute() override;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]