airborne12 commented on code in PR #61383: URL: https://github.com/apache/doris/pull/61383#discussion_r2963754135
########## be/src/storage/segment/variant/variant_streaming_compaction_writer.cpp: ########## @@ -0,0 +1,313 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "storage/segment/variant/variant_streaming_compaction_writer.h" + +#include <memory> + +#include "common/cast_set.h" +#include "core/column/column_nullable.h" +#include "core/column/column_variant.h" +#include "exec/common/variant_util.h" +#include "storage/index/indexed_column_writer.h" +#include "storage/iterator/olap_data_convertor.h" +#include "storage/segment/variant/variant_writer_helpers.h" +#include "storage/types.h" + +namespace doris::segment_v2 { + +#include "common/compile_check_begin.h" + +VariantStreamingCompactionWriter::VariantStreamingCompactionWriter( + const ColumnWriterOptions& opts, const TabletColumn* column, + NestedGroupWriteProvider* nested_group_provider, VariantStatistics* statistics) + : _opts(opts), + _tablet_column(column), + _nested_group_provider(nested_group_provider), + _statistics(statistics) {} + +Status VariantStreamingCompactionWriter::init() { + RETURN_IF_ERROR(build_nested_group_streaming_write_plan(_opts.input_rs_readers, *_tablet_column, + &_streaming_plan)); + RETURN_IF_ERROR(_init_root_writer()); + int column_id = 1; + RETURN_IF_ERROR(_init_regular_subcolumn_writers(column_id)); + RETURN_IF_ERROR(_nested_group_provider->init_with_plan(_streaming_plan, _tablet_column, _opts, + &column_id, _statistics)); + _statistics->to_pb(_opts.meta->mutable_variant_statistics()); + _phase = Phase::INITIALIZED; + return Status::OK(); +} + +Status VariantStreamingCompactionWriter::_init_root_writer() { + _root_writer = std::make_unique<ScalarColumnWriter>( + _opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*_tablet_column)), + _opts.file_writer); + RETURN_IF_ERROR(_root_writer->init()); + _opts.meta->set_num_rows(0); + return Status::OK(); +} + +Status VariantStreamingCompactionWriter::_init_regular_subcolumn_writers(int& column_id) { + _streaming_regular_subcolumn_writers.clear(); + for (const auto& plan_entry : _streaming_plan.regular_subcolumns) { + TabletColumn tablet_column; + TabletIndexes subcolumn_indexes; + ColumnWriterOptions opts; + std::unique_ptr<ColumnWriter> writer; + RETURN_IF_ERROR(variant_writer_helpers::prepare_subcolumn_writer_target( + _opts, *_tablet_column, column_id, plan_entry.path_in_data, plan_entry.data_type, 0, + 0, nullptr /* existing_subcolumn_info */, false /* check_storage_type */, + &subcolumn_indexes, &opts, &writer, &tablet_column)); + auto converter = std::make_unique<OlapBlockDataConvertor>(); + converter->add_column_data_convertor(tablet_column); + _subcolumns_indexes.push_back(std::move(subcolumn_indexes)); + _subcolumn_opts.push_back(opts); + _subcolumn_writers.push_back(std::move(writer)); + _streaming_regular_subcolumn_writers.push_back( + StreamingRegularSubcolumnWriter {.plan = plan_entry, + .tablet_column = std::move(tablet_column), + .converter = std::move(converter)}); + ++column_id; + } + return Status::OK(); +} + +Status VariantStreamingCompactionWriter::append_data(const uint8_t** ptr, size_t num_rows, + const uint8_t* outer_null_map) { + RETURN_IF_ERROR(_check_initialized("append_data")); + RETURN_IF_ERROR(_append_input_from_raw(ptr, num_rows, outer_null_map)); + if (num_rows > 0 && _phase == Phase::INITIALIZED) { + _phase = Phase::APPENDING; + } + return Status::OK(); +} + +Status VariantStreamingCompactionWriter::_append_input_from_raw(const uint8_t** ptr, + size_t num_rows, + const uint8_t* outer_null_map) { + const auto* column = reinterpret_cast<const VariantColumnData*>(*ptr); + const auto& src = *reinterpret_cast<const ColumnVariant*>(column->column_data); + RETURN_IF_ERROR(src.sanitize()); + return _append_input(src, column->row_pos, num_rows, outer_null_map); +} + +Status VariantStreamingCompactionWriter::_append_input(const ColumnVariant& src, size_t row_pos, + size_t num_rows, + const uint8_t* outer_null_map) { + auto chunk_variant = ColumnVariant::create(0); + chunk_variant->insert_range_from(src, row_pos, num_rows); + RETURN_IF_ERROR(chunk_variant->sanitize()); + chunk_variant->finalize(); + return _append_chunk(*chunk_variant, outer_null_map); +} + +Status VariantStreamingCompactionWriter::_append_root_column(const ColumnVariant& chunk_variant, + const uint8_t* outer_null_map) { + auto* variant = const_cast<ColumnVariant*>(&chunk_variant); Review Comment: **P3 — `const_cast` on `const ColumnVariant&` parameter** Both `_append_root_column` and `_append_regular_subcolumns` accept `const ColumnVariant&` but mutate the argument via `const_cast`. The actual caller always passes a locally-created mutable copy, so this is safe in practice. Nit: changing the parameter to `ColumnVariant&` (along with `_append_chunk`) would make the contract honest. ########## be/src/exec/common/variant_util.cpp: ########## @@ -969,6 +983,16 @@ Status VariantCompactionUtil::check_path_stats(const std::vector<RowsetSharedPtr return Status::OK(); } } + for (const auto& column : output->tablet_schema()->columns()) { + if (column->is_variant_type() && !should_check_variant_path_stats(*column)) { + return Status::OK(); + } + } + for (const auto& column : output->tablet_schema()->columns()) { + if (!column->is_variant_type()) { + continue; Review Comment: **P2 — Empty loop body (likely refactoring leftover)** This loop iterates over output columns, skips non-variant ones with `continue`, but does nothing with the variant columns. Looks like the loop body was removed during refactoring but the skeleton was left behind. Should either be deleted or have its intended logic filled in. ########## be/src/exec/common/variant_util.h: ########## @@ -72,6 +72,18 @@ bool glob_match_re2(const std::string& glob_pattern, const std::string& candidat using PathToNoneNullValues = std::unordered_map<std::string, int64_t>; using PathToDataTypes = std::unordered_map<PathInData, std::vector<DataTypePtr>, PathInData::Hash>; +inline bool should_record_variant_path_stats(const TabletColumn& column) { Review Comment: **P2 — Three `should_*` helpers have identical implementations** `should_record_variant_path_stats`, `should_write_variant_binary_columns`, and `should_check_variant_path_stats` all return `!column.variant_enable_nested_group()`. If they are expected to diverge in the future, please add a comment explaining the intended semantic differences. Otherwise consider consolidating into a single `is_nested_group_variant()` helper. ########## be/src/exec/common/variant_util.cpp: ########## @@ -936,6 +943,13 @@ Status VariantCompactionUtil::check_path_stats(const std::vector<RowsetSharedPtr if (output->tablet_schema()->num_variant_columns() == 0) { return Status::OK(); } + for (const auto& rowset : intputs) { + for (const auto& column : rowset->tablet_schema()->columns()) { + if (column->is_variant_type() && !should_check_variant_path_stats(*column)) { + return Status::OK(); Review Comment: **P1 — `check_path_stats` early return is too coarse-grained** This `return Status::OK()` bails out of the **entire** `check_path_stats` function when any single variant column has NG enabled. Since `variant_enable_nested_group` is a per-column property, a schema with both NG and non-NG variant columns would skip path-stats validation for the non-NG columns too. The same pattern appears again at line 988 for the output schema. Since this function validates data integrity after compaction, skipping it silently is risky. Consider changing to per-column `continue` in the downstream `aggregate_path_to_stats` loop instead of a function-level `return`. ########## be/src/storage/segment/variant/variant_streaming_compaction_writer.cpp: ########## @@ -0,0 +1,313 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "storage/segment/variant/variant_streaming_compaction_writer.h" + +#include <memory> + +#include "common/cast_set.h" +#include "core/column/column_nullable.h" +#include "core/column/column_variant.h" +#include "exec/common/variant_util.h" +#include "storage/index/indexed_column_writer.h" +#include "storage/iterator/olap_data_convertor.h" +#include "storage/segment/variant/variant_writer_helpers.h" +#include "storage/types.h" + +namespace doris::segment_v2 { + +#include "common/compile_check_begin.h" + +VariantStreamingCompactionWriter::VariantStreamingCompactionWriter( + const ColumnWriterOptions& opts, const TabletColumn* column, + NestedGroupWriteProvider* nested_group_provider, VariantStatistics* statistics) + : _opts(opts), + _tablet_column(column), + _nested_group_provider(nested_group_provider), + _statistics(statistics) {} + +Status VariantStreamingCompactionWriter::init() { + RETURN_IF_ERROR(build_nested_group_streaming_write_plan(_opts.input_rs_readers, *_tablet_column, + &_streaming_plan)); + RETURN_IF_ERROR(_init_root_writer()); + int column_id = 1; + RETURN_IF_ERROR(_init_regular_subcolumn_writers(column_id)); + RETURN_IF_ERROR(_nested_group_provider->init_with_plan(_streaming_plan, _tablet_column, _opts, + &column_id, _statistics)); + _statistics->to_pb(_opts.meta->mutable_variant_statistics()); + _phase = Phase::INITIALIZED; + return Status::OK(); +} + +Status VariantStreamingCompactionWriter::_init_root_writer() { + _root_writer = std::make_unique<ScalarColumnWriter>( + _opts, std::unique_ptr<StorageField>(StorageFieldFactory::create(*_tablet_column)), + _opts.file_writer); + RETURN_IF_ERROR(_root_writer->init()); + _opts.meta->set_num_rows(0); + return Status::OK(); +} + +Status VariantStreamingCompactionWriter::_init_regular_subcolumn_writers(int& column_id) { + _streaming_regular_subcolumn_writers.clear(); + for (const auto& plan_entry : _streaming_plan.regular_subcolumns) { + TabletColumn tablet_column; + TabletIndexes subcolumn_indexes; + ColumnWriterOptions opts; + std::unique_ptr<ColumnWriter> writer; + RETURN_IF_ERROR(variant_writer_helpers::prepare_subcolumn_writer_target( + _opts, *_tablet_column, column_id, plan_entry.path_in_data, plan_entry.data_type, 0, + 0, nullptr /* existing_subcolumn_info */, false /* check_storage_type */, + &subcolumn_indexes, &opts, &writer, &tablet_column)); + auto converter = std::make_unique<OlapBlockDataConvertor>(); + converter->add_column_data_convertor(tablet_column); + _subcolumns_indexes.push_back(std::move(subcolumn_indexes)); + _subcolumn_opts.push_back(opts); + _subcolumn_writers.push_back(std::move(writer)); + _streaming_regular_subcolumn_writers.push_back( + StreamingRegularSubcolumnWriter {.plan = plan_entry, + .tablet_column = std::move(tablet_column), + .converter = std::move(converter)}); + ++column_id; + } + return Status::OK(); +} + +Status VariantStreamingCompactionWriter::append_data(const uint8_t** ptr, size_t num_rows, + const uint8_t* outer_null_map) { + RETURN_IF_ERROR(_check_initialized("append_data")); + RETURN_IF_ERROR(_append_input_from_raw(ptr, num_rows, outer_null_map)); + if (num_rows > 0 && _phase == Phase::INITIALIZED) { + _phase = Phase::APPENDING; + } + return Status::OK(); +} + +Status VariantStreamingCompactionWriter::_append_input_from_raw(const uint8_t** ptr, + size_t num_rows, + const uint8_t* outer_null_map) { + const auto* column = reinterpret_cast<const VariantColumnData*>(*ptr); + const auto& src = *reinterpret_cast<const ColumnVariant*>(column->column_data); + RETURN_IF_ERROR(src.sanitize()); + return _append_input(src, column->row_pos, num_rows, outer_null_map); +} + +Status VariantStreamingCompactionWriter::_append_input(const ColumnVariant& src, size_t row_pos, + size_t num_rows, + const uint8_t* outer_null_map) { + auto chunk_variant = ColumnVariant::create(0); + chunk_variant->insert_range_from(src, row_pos, num_rows); + RETURN_IF_ERROR(chunk_variant->sanitize()); + chunk_variant->finalize(); Review Comment: **P3 — Redundant `is_finalized()` check after whole-column `finalize()`** `_append_input` calls `chunk_variant->finalize()` here, so by the time `_append_regular_subcolumns` checks `!subcolumn->is_finalized()` (line 181), it will always be true. The defensive check doesn't hurt correctness but is dead code on this path. ########## be/src/exec/common/variant_util.cpp: ########## @@ -848,6 +856,10 @@ Status VariantCompactionUtil::aggregate_variant_extended_info( if (!column->is_variant_type()) { continue; } + if (column->variant_enable_nested_group()) { Review Comment: **P2 — NG column skips `nested_paths` collection without explanation** When `variant_enable_nested_group()` is true, this `continue` skips the entire segment traversal including `get_nested_paths()`. The streaming write plan collects paths independently via `VariantColumnReader::get_nested_group_readers()`, so this is likely correct — but a brief comment explaining why the skip is safe would help future readers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
