github-actions[bot] commented on code in PR #60482: URL: https://github.com/apache/doris/pull/60482#discussion_r2977675016
########## be/src/format/transformer/merge_partitioner.cpp: ########## @@ -0,0 +1,365 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/transformer/merge_partitioner.h" + +#include <algorithm> +#include <cstdint> + +#include "common/cast_set.h" +#include "common/config.h" +#include "common/logging.h" +#include "common/status.h" +#include "core/block/block.h" +#include "core/column/column_const.h" +#include "core/column/column_nullable.h" +#include "core/column/column_vector.h" +#include "format/transformer/iceberg_partition_function.h" + +namespace doris { +#include "common/compile_check_begin.h" + +namespace { +constexpr int8_t kInsertOperation = 1; +constexpr int8_t kDeleteOperation = 2; +constexpr int8_t kUpdateOperation = 3; +constexpr int8_t kUpdateInsertOperation = 4; +constexpr int8_t kUpdateDeleteOperation = 5; + +int64_t scale_threshold_by_task(int64_t value, int task_num) { + if (task_num <= 0) { + return value; + } + int64_t scaled = value / task_num; + return scaled == 0 ? value : scaled; +} +} // namespace + +MergePartitioner::MergePartitioner(size_t partition_count, const TMergePartitionInfo& merge_info, + bool use_new_shuffle_hash_method) + : PartitionerBase(static_cast<HashValType>(partition_count)), + _merge_info(merge_info), + _use_new_shuffle_hash_method(use_new_shuffle_hash_method), + _insert_random(merge_info.insert_random) {} + +Status MergePartitioner::init(const std::vector<TExpr>& /*texprs*/) { + VExprContextSPtr op_ctx; + RETURN_IF_ERROR(VExpr::create_expr_tree(_merge_info.operation_expr, op_ctx)); + _operation_expr_ctxs.emplace_back(std::move(op_ctx)); + + std::vector<TExpr> insert_exprs; + std::vector<TIcebergPartitionField> insert_fields; + if (_merge_info.__isset.insert_partition_exprs) { + insert_exprs = _merge_info.insert_partition_exprs; + } + if (_merge_info.__isset.insert_partition_fields) { + insert_fields = _merge_info.insert_partition_fields; + } + if (!insert_exprs.empty() || !insert_fields.empty()) { + _insert_partition_function = std::make_unique<IcebergInsertPartitionFunction>( + _partition_count, _hash_method(), std::move(insert_exprs), + std::move(insert_fields)); + RETURN_IF_ERROR(_insert_partition_function->init({})); + } + + if (_merge_info.__isset.delete_partition_exprs && !_merge_info.delete_partition_exprs.empty()) { + _delete_partition_function = std::make_unique<IcebergDeletePartitionFunction>( + _partition_count, _hash_method(), _merge_info.delete_partition_exprs); + RETURN_IF_ERROR(_delete_partition_function->init({})); + } + return Status::OK(); +} + +Status MergePartitioner::prepare(RuntimeState* state, const RowDescriptor& row_desc) { + RETURN_IF_ERROR(VExpr::prepare(_operation_expr_ctxs, state, row_desc)); + if (_insert_partition_function != nullptr) { + RETURN_IF_ERROR(_insert_partition_function->prepare(state, row_desc)); + } + if (_delete_partition_function != nullptr) { + RETURN_IF_ERROR(_delete_partition_function->prepare(state, row_desc)); + } + return Status::OK(); +} + +Status MergePartitioner::open(RuntimeState* state) { + RETURN_IF_ERROR(VExpr::open(_operation_expr_ctxs, state)); + if (_insert_partition_function != nullptr) { + RETURN_IF_ERROR(_insert_partition_function->open(state)); + if (auto* insert_function = + dynamic_cast<IcebergInsertPartitionFunction*>(_insert_partition_function.get()); + insert_function != nullptr && insert_function->fallback_to_random()) { + _insert_random = true; + } + } + if (_delete_partition_function != nullptr) { + RETURN_IF_ERROR(_delete_partition_function->open(state)); + } + _init_insert_scaling(state); + return Status::OK(); +} + +Status MergePartitioner::close(RuntimeState* /*state*/) { + return Status::OK(); +} + +Status MergePartitioner::do_partitioning(RuntimeState* state, Block* block) const { + const size_t rows = block->rows(); + if (rows == 0) { + _channel_ids.clear(); + return Status::OK(); + } + + const size_t column_to_keep = block->columns(); + if (_operation_expr_ctxs.empty()) { + return Status::InternalError("Merge partitioning missing operation expression"); + } + + int op_idx = -1; + RETURN_IF_ERROR(_operation_expr_ctxs[0]->execute(block, &op_idx)); + if (op_idx < 0 || op_idx >= block->columns()) { + return Status::InternalError("Merge partitioning missing operation column"); + } + if (op_idx >= cast_set<int>(column_to_keep)) { + return Status::InternalError("Merge partitioning requires operation column in input block"); + } + + const auto& op_column = block->get_by_position(op_idx).column; + const auto* op_data = remove_nullable(op_column).get(); + std::vector<int8_t> ops(rows); + bool has_insert = false; + bool has_delete = false; + bool has_update = false; + for (size_t i = 0; i < rows; ++i) { + int8_t op = static_cast<int8_t>(op_data->get_int(i)); + ops[i] = op; + if (_is_insert_op(op)) { + has_insert = true; + } + if (_is_delete_op(op)) { + has_delete = true; + } + if (op == kUpdateOperation) { + has_update = true; + } + } + + if (has_insert && !_insert_random && _insert_partition_function == nullptr) { + return Status::InternalError("Merge partitioning insert exprs are empty"); + } + if (has_delete && _delete_partition_function == nullptr) { + return Status::InternalError("Merge partitioning delete exprs are empty"); + } + + std::vector<uint32_t> insert_hashes; + std::vector<uint32_t> delete_hashes; + const size_t insert_partition_count = + _enable_insert_rebalance ? _insert_partition_count : _partition_count; + if (has_insert && !_insert_random) { + RETURN_IF_ERROR(_insert_partition_function->get_partitions( + state, block, insert_partition_count, insert_hashes)); + } + if (has_delete) { + RETURN_IF_ERROR(_delete_partition_function->get_partitions(state, block, _partition_count, + delete_hashes)); + } + if (has_insert) { + if (_insert_random) { + if (_non_partition_scaling_threshold > 0) { + _insert_data_processed += static_cast<int64_t>(block->bytes()); + if (_insert_writer_count < static_cast<int>(_partition_count) && + _insert_data_processed >= + _insert_writer_count * _non_partition_scaling_threshold) { + _insert_writer_count++; + } + } else { + _insert_writer_count = static_cast<int>(_partition_count); + } + } else if (_enable_insert_rebalance) { + _apply_insert_rebalance(ops, insert_hashes, block->bytes()); + } + } + + Block::erase_useless_column(block, column_to_keep); + + _channel_ids.resize(rows); + for (size_t i = 0; i < rows; ++i) { + const int8_t op = ops[i]; + if (op == kUpdateOperation) { + _channel_ids[i] = delete_hashes[i]; + continue; + } + if (_is_insert_op(op)) { + _channel_ids[i] = _insert_random ? _next_rr_channel() : insert_hashes[i]; + } else if (_is_delete_op(op)) { + _channel_ids[i] = delete_hashes[i]; + } else { + return Status::InternalError("Unknown Iceberg merge operation {}", op); + } + } + + if (has_update) { + for (size_t col_idx = 0; col_idx < block->columns(); ++col_idx) { + block->replace_by_position_if_const(col_idx); + } + + MutableColumns mutable_columns = block->mutate_columns(); + MutableColumnPtr& op_mut = mutable_columns[op_idx]; + ColumnInt8* op_values_col = nullptr; + if (auto* nullable_col = check_and_get_column<ColumnNullable>(op_mut.get())) { + op_values_col = + check_and_get_column<ColumnInt8>(nullable_col->get_nested_column_ptr().get()); + } else { + op_values_col = check_and_get_column<ColumnInt8>(op_mut.get()); + } + if (op_values_col == nullptr) { + block->set_columns(std::move(mutable_columns)); + return Status::InternalError("Merge operation column must be tinyint"); + } + auto& op_values = op_values_col->get_data(); + const size_t original_rows = rows; + for (size_t row = 0; row < original_rows; ++row) { + if (ops[row] != kUpdateOperation) { + continue; + } + op_values[row] = kUpdateDeleteOperation; + for (size_t col_idx = 0; col_idx < mutable_columns.size(); ++col_idx) { + mutable_columns[col_idx]->insert_from(*mutable_columns[col_idx], row); + } Review Comment: **CRITICAL: Use-after-free (self-referential `insert_from`)** When `op == kUpdateOperation`, this line duplicates a row from `mutable_columns[col_idx]` back into the **same** column: ```cpp mutable_columns[col_idx]->insert_from(*mutable_columns[col_idx], row); ``` For `ColumnString`, `insert_from` calls `chars.resize(new_size)` which can reallocate the internal buffer, then reads from `src.chars[offset]` — but `src` IS `*this`, so the source pointer is now dangling. This is a classic self-referential container insertion bug that causes use-after-free / data corruption. **Fix**: Clone each row's data into a temporary `Block` or row-holder first, then append from the temporary. Alternatively, collect all update rows' indices first, then bulk-duplicate them from a snapshot of the original columns. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java: ########## @@ -226,6 +249,210 @@ private void updateManifestAfterRewrite() { } } + /** + * Begin delete operation for Iceberg table + */ + public void beginDelete(ExternalTable dorisTable) throws UserException { + try { + ops.getExecutionAuthenticator().execute(() -> { + // create and start the iceberg transaction + this.table = IcebergUtils.getIcebergTable(dorisTable); + this.baseSnapshotId = getSnapshotIdIfPresent(table); + // // Verify table format version (must be v2+ for delete support) + // String formatVersionStr = table.properties().get("format-version"); Review Comment: **HIGH: Format version v2 check is commented out** Position delete files require Iceberg table format version >= 2. This validation is commented out. Operating against a v1 table will lead to runtime errors from the Iceberg library when attempting to create position delete files, or potentially silent data corruption. This check should be active, or at minimum there should be a clear comment explaining why it's deferred (e.g., if the Iceberg library itself validates this). ########## be/src/format/transformer/iceberg_partition_function.cpp: ########## @@ -0,0 +1,395 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/transformer/iceberg_partition_function.h" + +#include "common/cast_set.h" +#include "common/exception.h" +#include "common/logging.h" +#include "common/status.h" +#include "core/column/column_const.h" +#include "core/column/column_nullable.h" +#include "core/column/column_struct.h" +#include "core/data_type/data_type_struct.h" +#include "exec/sink/writer/iceberg/partition_transformers.h" +#include "format/table/iceberg/partition_spec.h" +#include "util/string_util.h" + +namespace doris { +#include "common/compile_check_begin.h" + +using HashValType = PartitionerBase::HashValType; + +static void initialize_shuffle_hashes(std::vector<HashValType>& hashes, size_t rows, + ShuffleHashMethod method) { + hashes.resize(rows); + if (method == ShuffleHashMethod::CRC32C) { + constexpr HashValType CRC32C_SHUFFLE_SEED = 0x9E3779B9U; + std::fill(hashes.begin(), hashes.end(), CRC32C_SHUFFLE_SEED); + } else { + std::fill(hashes.begin(), hashes.end(), 0); + } +} + +static void update_shuffle_hashes(const ColumnPtr& column, const DataTypePtr& type, + HashValType* __restrict result, ShuffleHashMethod method) { + if (method == ShuffleHashMethod::CRC32C) { + column->update_crc32c_batch(result, nullptr); + } else { + column->update_crcs_with_value(result, type->get_primitive_type(), + cast_set<HashValType>(column->size())); + } +} + +static void apply_shuffle_channel_ids(std::vector<HashValType>& hashes, size_t partition_count, + ShuffleHashMethod method) { + for (auto& h : hashes) { + if (method == ShuffleHashMethod::CRC32C) { + h = crc32c_shuffle_mix(h) % partition_count; + } else { + h = h % partition_count; + } + } +} + +IcebergInsertPartitionFunction::IcebergInsertPartitionFunction( + HashValType partition_count, ShuffleHashMethod hash_method, + std::vector<TExpr> partition_exprs, std::vector<TIcebergPartitionField> partition_fields) + : _partition_count(partition_count), + _hash_method(hash_method), + _partition_exprs(std::move(partition_exprs)), + _partition_fields_spec(std::move(partition_fields)) {} + +Status IcebergInsertPartitionFunction::init(const std::vector<TExpr>& texprs) { + const auto& exprs = _partition_exprs.empty() ? texprs : _partition_exprs; + if (!exprs.empty()) { + RETURN_IF_ERROR(VExpr::create_expr_trees(exprs, _partition_expr_ctxs)); + } + if (!_partition_fields_spec.empty()) { + _partition_fields.reserve(_partition_fields_spec.size()); + for (const auto& field : _partition_fields_spec) { + VExprContextSPtr ctx; + RETURN_IF_ERROR(VExpr::create_expr_tree(field.source_expr, ctx)); + InsertPartitionField insert_field; + insert_field.transform = field.transform; + insert_field.expr_ctx = std::move(ctx); + insert_field.source_id = field.__isset.source_id ? field.source_id : 0; + insert_field.name = field.__isset.name ? field.name : ""; + _partition_fields.emplace_back(std::move(insert_field)); + } + } + return Status::OK(); +} + +Status IcebergInsertPartitionFunction::prepare(RuntimeState* state, const RowDescriptor& row_desc) { + RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, row_desc)); + if (!_partition_fields.empty()) { + VExprContextSPtrs field_ctxs; + field_ctxs.reserve(_partition_fields.size()); + for (const auto& field : _partition_fields) { + field_ctxs.emplace_back(field.expr_ctx); + } + RETURN_IF_ERROR(VExpr::prepare(field_ctxs, state, row_desc)); + } + return Status::OK(); +} + +Status IcebergInsertPartitionFunction::open(RuntimeState* state) { + RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state)); + if (!_partition_fields.empty()) { + VExprContextSPtrs field_ctxs; + field_ctxs.reserve(_partition_fields.size()); + for (const auto& field : _partition_fields) { + field_ctxs.emplace_back(field.expr_ctx); + } + RETURN_IF_ERROR(VExpr::open(field_ctxs, state)); + for (auto& field : _partition_fields) { + try { + doris::iceberg::PartitionField partition_field(field.source_id, 0, field.name, + field.transform); + field.transformer = PartitionColumnTransforms::create( + partition_field, field.expr_ctx->root()->data_type()); + } catch (const doris::Exception& e) { + LOG(WARNING) << "Merge partitioning fallback to RR: " << e.what(); + _fallback_to_random = true; + _partition_fields.clear(); + break; + } + } + } + return Status::OK(); +} + +Status IcebergInsertPartitionFunction::get_partitions(RuntimeState* /*state*/, Block* block, + size_t partition_count, + std::vector<HashValType>& partitions) const { + if (_fallback_to_random) { + return Status::InternalError("Merge partitioning fallback to random"); + } + if (partition_count == 0) { + return Status::InternalError("Partition count is zero"); + } + if (!_partition_fields.empty()) { + RETURN_IF_ERROR(_compute_hashes_with_transform(block, partitions)); + } else { + RETURN_IF_ERROR(_compute_hashes_with_exprs(block, partitions)); + } + apply_shuffle_channel_ids(partitions, partition_count, _hash_method); + return Status::OK(); +} + +Status IcebergInsertPartitionFunction::clone(RuntimeState* state, + std::unique_ptr<PartitionFunction>& function) const { + auto* new_function = new IcebergInsertPartitionFunction( + _partition_count, _hash_method, _partition_exprs, _partition_fields_spec); + function.reset(new_function); + RETURN_IF_ERROR( + _clone_expr_ctxs(state, _partition_expr_ctxs, new_function->_partition_expr_ctxs)); + if (!_partition_fields.empty()) { + VExprContextSPtrs src_field_ctxs; + src_field_ctxs.reserve(_partition_fields.size()); + for (const auto& field : _partition_fields) { + src_field_ctxs.emplace_back(field.expr_ctx); + } + VExprContextSPtrs dst_field_ctxs; + RETURN_IF_ERROR(_clone_expr_ctxs(state, src_field_ctxs, dst_field_ctxs)); + new_function->_partition_fields.reserve(dst_field_ctxs.size()); + for (size_t i = 0; i < dst_field_ctxs.size(); ++i) { + InsertPartitionField field; + field.transform = _partition_fields[i].transform; + field.expr_ctx = dst_field_ctxs[i]; + field.source_id = _partition_fields[i].source_id; + field.name = _partition_fields[i].name; + new_function->_partition_fields.emplace_back(std::move(field)); + } + } + new_function->_fallback_to_random = _fallback_to_random; + return Status::OK(); +} + +Status IcebergInsertPartitionFunction::_compute_hashes_with_transform( + Block* block, std::vector<HashValType>& partitions) const { + const size_t rows = block->rows(); + if (rows == 0) { + partitions.clear(); + return Status::OK(); + } + if (_partition_fields.empty()) { + return Status::InternalError("Merge partitioning insert fields are empty"); + } + + std::vector<int> results(_partition_fields.size()); + for (size_t i = 0; i < _partition_fields.size(); ++i) { + RETURN_IF_ERROR(_partition_fields[i].expr_ctx->execute(block, &results[i])); + } + + initialize_shuffle_hashes(partitions, rows, _hash_method); + auto* __restrict hash_values = partitions.data(); + for (size_t i = 0; i < _partition_fields.size(); ++i) { + if (_partition_fields[i].transformer == nullptr) { + return Status::InternalError("Merge partitioning transform is not initialized"); + } + ColumnWithTypeAndName transformed = + _partition_fields[i].transformer->apply(*block, results[i]); + const auto& [column, is_const] = unpack_if_const(transformed.column); + if (is_const) { + continue; Review Comment: **MEDIUM: Const columns skipped in hash computation** When a partition column is const (same value for all rows in a block), its hash contribution is skipped entirely (`continue`). If different blocks have different const values for the same partition column, rows that should go to different partitions will hash identically, causing incorrect data routing. For example: block A has `partition_col = 'a'` (const), block B has `partition_col = 'b'` (const). Both blocks will produce the same hash values since the partition column's contribution is skipped. Consider materializing const columns before hashing, or computing the const value's hash contribution once and applying it to all rows. ########## be/src/format/transformer/merge_partitioner.cpp: ########## @@ -0,0 +1,365 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "format/transformer/merge_partitioner.h" + +#include <algorithm> +#include <cstdint> + +#include "common/cast_set.h" +#include "common/config.h" +#include "common/logging.h" +#include "common/status.h" +#include "core/block/block.h" +#include "core/column/column_const.h" +#include "core/column/column_nullable.h" +#include "core/column/column_vector.h" +#include "format/transformer/iceberg_partition_function.h" + +namespace doris { +#include "common/compile_check_begin.h" + +namespace { +constexpr int8_t kInsertOperation = 1; +constexpr int8_t kDeleteOperation = 2; +constexpr int8_t kUpdateOperation = 3; +constexpr int8_t kUpdateInsertOperation = 4; +constexpr int8_t kUpdateDeleteOperation = 5; + +int64_t scale_threshold_by_task(int64_t value, int task_num) { + if (task_num <= 0) { + return value; + } + int64_t scaled = value / task_num; + return scaled == 0 ? value : scaled; +} +} // namespace + +MergePartitioner::MergePartitioner(size_t partition_count, const TMergePartitionInfo& merge_info, + bool use_new_shuffle_hash_method) + : PartitionerBase(static_cast<HashValType>(partition_count)), + _merge_info(merge_info), + _use_new_shuffle_hash_method(use_new_shuffle_hash_method), + _insert_random(merge_info.insert_random) {} + +Status MergePartitioner::init(const std::vector<TExpr>& /*texprs*/) { + VExprContextSPtr op_ctx; + RETURN_IF_ERROR(VExpr::create_expr_tree(_merge_info.operation_expr, op_ctx)); + _operation_expr_ctxs.emplace_back(std::move(op_ctx)); + + std::vector<TExpr> insert_exprs; + std::vector<TIcebergPartitionField> insert_fields; + if (_merge_info.__isset.insert_partition_exprs) { + insert_exprs = _merge_info.insert_partition_exprs; + } + if (_merge_info.__isset.insert_partition_fields) { + insert_fields = _merge_info.insert_partition_fields; + } + if (!insert_exprs.empty() || !insert_fields.empty()) { + _insert_partition_function = std::make_unique<IcebergInsertPartitionFunction>( + _partition_count, _hash_method(), std::move(insert_exprs), + std::move(insert_fields)); + RETURN_IF_ERROR(_insert_partition_function->init({})); + } + + if (_merge_info.__isset.delete_partition_exprs && !_merge_info.delete_partition_exprs.empty()) { + _delete_partition_function = std::make_unique<IcebergDeletePartitionFunction>( + _partition_count, _hash_method(), _merge_info.delete_partition_exprs); + RETURN_IF_ERROR(_delete_partition_function->init({})); + } + return Status::OK(); +} + +Status MergePartitioner::prepare(RuntimeState* state, const RowDescriptor& row_desc) { + RETURN_IF_ERROR(VExpr::prepare(_operation_expr_ctxs, state, row_desc)); + if (_insert_partition_function != nullptr) { + RETURN_IF_ERROR(_insert_partition_function->prepare(state, row_desc)); + } + if (_delete_partition_function != nullptr) { + RETURN_IF_ERROR(_delete_partition_function->prepare(state, row_desc)); + } + return Status::OK(); +} + +Status MergePartitioner::open(RuntimeState* state) { + RETURN_IF_ERROR(VExpr::open(_operation_expr_ctxs, state)); + if (_insert_partition_function != nullptr) { + RETURN_IF_ERROR(_insert_partition_function->open(state)); + if (auto* insert_function = + dynamic_cast<IcebergInsertPartitionFunction*>(_insert_partition_function.get()); + insert_function != nullptr && insert_function->fallback_to_random()) { + _insert_random = true; + } + } + if (_delete_partition_function != nullptr) { + RETURN_IF_ERROR(_delete_partition_function->open(state)); + } + _init_insert_scaling(state); + return Status::OK(); +} + +Status MergePartitioner::close(RuntimeState* /*state*/) { + return Status::OK(); +} + +Status MergePartitioner::do_partitioning(RuntimeState* state, Block* block) const { + const size_t rows = block->rows(); + if (rows == 0) { + _channel_ids.clear(); + return Status::OK(); + } + + const size_t column_to_keep = block->columns(); + if (_operation_expr_ctxs.empty()) { + return Status::InternalError("Merge partitioning missing operation expression"); + } + + int op_idx = -1; + RETURN_IF_ERROR(_operation_expr_ctxs[0]->execute(block, &op_idx)); + if (op_idx < 0 || op_idx >= block->columns()) { + return Status::InternalError("Merge partitioning missing operation column"); + } + if (op_idx >= cast_set<int>(column_to_keep)) { + return Status::InternalError("Merge partitioning requires operation column in input block"); + } + + const auto& op_column = block->get_by_position(op_idx).column; + const auto* op_data = remove_nullable(op_column).get(); + std::vector<int8_t> ops(rows); + bool has_insert = false; + bool has_delete = false; + bool has_update = false; + for (size_t i = 0; i < rows; ++i) { + int8_t op = static_cast<int8_t>(op_data->get_int(i)); + ops[i] = op; + if (_is_insert_op(op)) { + has_insert = true; + } + if (_is_delete_op(op)) { + has_delete = true; + } + if (op == kUpdateOperation) { + has_update = true; + } + } + + if (has_insert && !_insert_random && _insert_partition_function == nullptr) { + return Status::InternalError("Merge partitioning insert exprs are empty"); + } + if (has_delete && _delete_partition_function == nullptr) { + return Status::InternalError("Merge partitioning delete exprs are empty"); + } + + std::vector<uint32_t> insert_hashes; + std::vector<uint32_t> delete_hashes; + const size_t insert_partition_count = + _enable_insert_rebalance ? _insert_partition_count : _partition_count; + if (has_insert && !_insert_random) { + RETURN_IF_ERROR(_insert_partition_function->get_partitions( + state, block, insert_partition_count, insert_hashes)); + } + if (has_delete) { + RETURN_IF_ERROR(_delete_partition_function->get_partitions(state, block, _partition_count, + delete_hashes)); + } + if (has_insert) { + if (_insert_random) { + if (_non_partition_scaling_threshold > 0) { + _insert_data_processed += static_cast<int64_t>(block->bytes()); + if (_insert_writer_count < static_cast<int>(_partition_count) && + _insert_data_processed >= + _insert_writer_count * _non_partition_scaling_threshold) { + _insert_writer_count++; + } + } else { + _insert_writer_count = static_cast<int>(_partition_count); + } + } else if (_enable_insert_rebalance) { + _apply_insert_rebalance(ops, insert_hashes, block->bytes()); + } + } + + Block::erase_useless_column(block, column_to_keep); + + _channel_ids.resize(rows); + for (size_t i = 0; i < rows; ++i) { + const int8_t op = ops[i]; + if (op == kUpdateOperation) { + _channel_ids[i] = delete_hashes[i]; + continue; + } + if (_is_insert_op(op)) { + _channel_ids[i] = _insert_random ? _next_rr_channel() : insert_hashes[i]; + } else if (_is_delete_op(op)) { + _channel_ids[i] = delete_hashes[i]; + } else { + return Status::InternalError("Unknown Iceberg merge operation {}", op); + } + } + + if (has_update) { + for (size_t col_idx = 0; col_idx < block->columns(); ++col_idx) { + block->replace_by_position_if_const(col_idx); + } + + MutableColumns mutable_columns = block->mutate_columns(); + MutableColumnPtr& op_mut = mutable_columns[op_idx]; + ColumnInt8* op_values_col = nullptr; + if (auto* nullable_col = check_and_get_column<ColumnNullable>(op_mut.get())) { + op_values_col = + check_and_get_column<ColumnInt8>(nullable_col->get_nested_column_ptr().get()); + } else { + op_values_col = check_and_get_column<ColumnInt8>(op_mut.get()); + } + if (op_values_col == nullptr) { + block->set_columns(std::move(mutable_columns)); + return Status::InternalError("Merge operation column must be tinyint"); + } + auto& op_values = op_values_col->get_data(); + const size_t original_rows = rows; + for (size_t row = 0; row < original_rows; ++row) { + if (ops[row] != kUpdateOperation) { + continue; + } + op_values[row] = kUpdateDeleteOperation; + for (size_t col_idx = 0; col_idx < mutable_columns.size(); ++col_idx) { + mutable_columns[col_idx]->insert_from(*mutable_columns[col_idx], row); + } + const size_t new_row_idx = op_values.size() - 1; + op_values[new_row_idx] = kUpdateInsertOperation; + const uint32_t insert_channel = + _insert_random ? _next_rr_channel() : insert_hashes[row]; + _channel_ids.push_back(insert_channel); Review Comment: **CRITICAL: Out-of-bounds vector access when `_insert_random` is true** `insert_hashes` is only populated when `has_insert && !_insert_random` (line 171). However, when `op == kUpdateOperation`, this line accesses `insert_hashes[row]` unconditionally via the ternary's false branch. If `_insert_random` is true, `insert_hashes` is empty, causing undefined behavior (out-of-bounds read). While the ternary condition `_insert_random ? _next_rr_channel() : insert_hashes[row]` appears to short-circuit, C++ does NOT short-circuit ternary operands — both branches may be evaluated depending on compiler optimization. Actually, the ternary DOES short-circuit (only the selected branch is evaluated), but this relies on the compiler not speculatively evaluating the false branch. The real concern is: if `_insert_random` can ever be false for some rows but true for the function as a whole, or if future refactoring changes the branching logic. **Correction**: On re-analysis, the C++ ternary operator does guarantee only one branch is evaluated. The condition `_insert_random` is a member variable that doesn't change during the loop. So if `_insert_random` is true, `insert_hashes[row]` is never evaluated. This is technically safe but fragile — consider guarding with a `DCHECK(!_insert_random || insert_hashes.empty())` or populating `insert_hashes` unconditionally. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java: ########## @@ -294,7 +521,17 @@ public void rollback() { } public long getUpdateCnt() { - return commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum(); + long dataRows = 0; + long deleteRows = 0; + for (TIcebergCommitData commitData : commitDataList) { + if (commitData.isSetFileContent() + && commitData.getFileContent() == TFileContent.POSITION_DELETES) { + deleteRows += commitData.getRowCount(); + } else { + dataRows += commitData.getRowCount(); + } + } + return dataRows > 0 ? dataRows : deleteRows; } Review Comment: **MEDIUM: `getUpdateCnt()` returns misleading row counts** For a MERGE/UPDATE that produces both data files (inserts) and delete files: ```java return dataRows > 0 ? dataRows : deleteRows; ``` This reports only insert rows for mixed operations. For an UPDATE of 100 rows (which produces 100 position deletes + 100 new inserts), the user sees "100 rows affected" but the 100 deletes are invisible. For a pure DELETE operation, only delete rows are reported. The semantics should be documented, or consider returning `Math.max(dataRows, deleteRows)` or a separate count for each operation type. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java: ########## @@ -226,6 +249,210 @@ private void updateManifestAfterRewrite() { Review Comment: **HIGH: NullPointerException on empty Iceberg table** `table.currentSnapshot()` returns `null` for a newly created table with no data. This line will throw NPE: ```java this.startingSnapshotId = table.currentSnapshot().snapshotId(); ``` Note that `getSnapshotIdIfPresent()` (line 587) already handles this case correctly. Consider using: ```java this.startingSnapshotId = getSnapshotIdIfPresent(table); ``` ########## be/src/exec/sink/viceberg_delete_sink.cpp: ########## @@ -0,0 +1,501 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/sink/viceberg_delete_sink.h" + +#include <fmt/format.h> + +#include "common/logging.h" +#include "core/block/column_with_type_and_name.h" +#include "core/column/column_nullable.h" +#include "core/column/column_string.h" +#include "core/column/column_struct.h" +#include "core/column/column_vector.h" +#include "core/data_type/data_type_factory.hpp" +#include "core/data_type/data_type_nullable.h" +#include "core/data_type/data_type_number.h" +#include "core/data_type/data_type_string.h" +#include "core/data_type/data_type_struct.h" +#include "exprs/vexpr.h" +#include "format/transformer/vfile_format_transformer.h" +#include "runtime/runtime_state.h" +#include "util/string_util.h" +#include "util/uid_util.h" + +namespace doris { + +VIcebergDeleteSink::VIcebergDeleteSink(const TDataSink& t_sink, + const VExprContextSPtrs& output_exprs, + std::shared_ptr<Dependency> dep, + std::shared_ptr<Dependency> fin_dep) + : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) { + DCHECK(_t_sink.__isset.iceberg_delete_sink); +} + +Status VIcebergDeleteSink::init_properties(ObjectPool* pool) { + const auto& delete_sink = _t_sink.iceberg_delete_sink; + + _delete_type = delete_sink.delete_type; + if (_delete_type != TFileContent::POSITION_DELETES) { + return Status::NotSupported("Iceberg delete only supports position delete files"); + } + + // Get file format settings + if (delete_sink.__isset.file_format) { + _file_format_type = delete_sink.file_format; + } + + if (delete_sink.__isset.compress_type) { + _compress_type = delete_sink.compress_type; + } + + // Get output path and table location + if (delete_sink.__isset.output_path) { + _output_path = delete_sink.output_path; + } + + if (delete_sink.__isset.table_location) { + _table_location = delete_sink.table_location; + } + + // Get Hadoop configuration + if (delete_sink.__isset.hadoop_config) { + _hadoop_conf.insert(delete_sink.hadoop_config.begin(), delete_sink.hadoop_config.end()); + } + + if (delete_sink.__isset.file_type) { + _file_type = delete_sink.file_type; + } + + if (delete_sink.__isset.broker_addresses) { + _broker_addresses.assign(delete_sink.broker_addresses.begin(), + delete_sink.broker_addresses.end()); + } + + // Get partition information + if (delete_sink.__isset.partition_spec_id) { + _partition_spec_id = delete_sink.partition_spec_id; + } + + if (delete_sink.__isset.partition_data_json) { + _partition_data_json = delete_sink.partition_data_json; + } + + return Status::OK(); +} + +Status VIcebergDeleteSink::open(RuntimeState* state, RuntimeProfile* profile) { + _state = state; + + // Initialize counters + _written_rows_counter = ADD_COUNTER(profile, "RowsWritten", TUnit::UNIT); + _send_data_timer = ADD_TIMER(profile, "SendDataTime"); + _write_delete_files_timer = ADD_TIMER(profile, "WriteDeleteFilesTime"); + _delete_file_count_counter = ADD_COUNTER(profile, "DeleteFileCount", TUnit::UNIT); + _open_timer = ADD_TIMER(profile, "OpenTime"); + _close_timer = ADD_TIMER(profile, "CloseTime"); + + SCOPED_TIMER(_open_timer); + + RETURN_IF_ERROR(_init_position_delete_output_exprs()); + + LOG(INFO) << fmt::format("VIcebergDeleteSink opened: delete_type={}, output_path={}", + to_string(_delete_type), _output_path); + + return Status::OK(); +} + +Status VIcebergDeleteSink::write(RuntimeState* state, Block& block) { + SCOPED_TIMER(_send_data_timer); + + if (block.rows() == 0) { + return Status::OK(); + } + + _row_count += block.rows(); + + if (_delete_type != TFileContent::POSITION_DELETES) { + return Status::NotSupported("Iceberg delete only supports position delete files"); + } + + // Extract $row_id column and group by file_path + RETURN_IF_ERROR(_collect_position_deletes(block, _file_deletions)); + + if (_written_rows_counter) { + COUNTER_UPDATE(_written_rows_counter, block.rows()); + } + + return Status::OK(); +} + +Status VIcebergDeleteSink::close(Status close_status) { + SCOPED_TIMER(_close_timer); + + if (!close_status.ok()) { + LOG(WARNING) << fmt::format("VIcebergDeleteSink close with error: {}", + close_status.to_string()); + return close_status; + } + + if (_delete_type == TFileContent::POSITION_DELETES && !_file_deletions.empty()) { + SCOPED_TIMER(_write_delete_files_timer); + RETURN_IF_ERROR(_write_position_delete_files(_file_deletions)); + } + + // Update counters + if (_delete_file_count_counter) { + COUNTER_UPDATE(_delete_file_count_counter, _delete_file_count); + } + + LOG(INFO) << fmt::format("VIcebergDeleteSink closed: rows={}, delete_files={}", _row_count, + _delete_file_count); + + if (_state != nullptr) { + for (const auto& commit_data : _commit_data_list) { + _state->add_iceberg_commit_datas(commit_data); + } + } + + return Status::OK(); +} + +int VIcebergDeleteSink::_get_row_id_column_index(const Block& block) { + // Find __DORIS_ICEBERG_ROWID_COL__ column in block + for (size_t i = 0; i < block.columns(); ++i) { + const auto& col_name = block.get_by_position(i).name; + if (col_name == doris::BeConsts::ICEBERG_ROWID_COL) { + return static_cast<int>(i); + } + } + return -1; +} + +Status VIcebergDeleteSink::_collect_position_deletes( + const Block& block, std::map<std::string, IcebergFileDeletion>& file_deletions) { + // Find row id column + int row_id_col_idx = _get_row_id_column_index(block); + if (row_id_col_idx < 0) { + return Status::InternalError( + "__DORIS_ICEBERG_ROWID_COL__ column not found in block for position delete"); + } + + const auto& row_id_col = block.get_by_position(row_id_col_idx); + const IColumn* row_id_data = row_id_col.column.get(); + const IDataType* row_id_type = row_id_col.type.get(); + const auto* nullable_col = check_and_get_column<ColumnNullable>(row_id_data); + if (nullable_col != nullptr) { + row_id_data = nullable_col->get_nested_column_ptr().get(); + } + const auto* nullable_type = check_and_get_data_type<DataTypeNullable>(row_id_type); + if (nullable_type != nullptr) { + row_id_type = nullable_type->get_nested_type().get(); + } + const auto* struct_col = check_and_get_column<ColumnStruct>(row_id_data); + const auto* struct_type = check_and_get_data_type<DataTypeStruct>(row_id_type); + if (!struct_col || !struct_type) { + return Status::InternalError("__DORIS_ICEBERG_ROWID_COL__ column is not a struct column"); + } + + // __DORIS_ICEBERG_ROWID_COL__ struct: + // (file_path: STRING, row_position: BIGINT, partition_spec_id: INT, partition_data: STRING) + size_t field_count = struct_col->tuple_size(); + if (field_count < 2) { + return Status::InternalError( + "__DORIS_ICEBERG_ROWID_COL__ struct must have at least 2 fields " + "(file_path, row_position)"); + } + + auto normalize = [](const std::string& name) { return doris::to_lower(name); }; + + int file_path_idx = -1; + int row_position_idx = -1; + int spec_id_idx = -1; + int partition_data_idx = -1; + const auto& field_names = struct_type->get_element_names(); + for (size_t i = 0; i < field_names.size(); ++i) { + std::string name = normalize(field_names[i]); + if (file_path_idx < 0 && name == "file_path") { + file_path_idx = static_cast<int>(i); + } else if (row_position_idx < 0 && name == "row_position") { + row_position_idx = static_cast<int>(i); + } else if (spec_id_idx < 0 && name == "partition_spec_id") { + spec_id_idx = static_cast<int>(i); + } else if (partition_data_idx < 0 && name == "partition_data") { + partition_data_idx = static_cast<int>(i); + } + } + + if (file_path_idx < 0 || row_position_idx < 0) { + return Status::InternalError( + "__DORIS_ICEBERG_ROWID_COL__ must contain standard fields file_path and " + "row_position"); + } + if (field_count >= 3 && spec_id_idx < 0) { + return Status::InternalError( + "__DORIS_ICEBERG_ROWID_COL__ must use standard field name partition_spec_id"); + } + if (field_count >= 4 && partition_data_idx < 0) { + return Status::InternalError( + "__DORIS_ICEBERG_ROWID_COL__ must use standard field name partition_data"); + } + + const auto* file_path_col = check_and_get_column<ColumnString>( + remove_nullable(struct_col->get_column_ptr(file_path_idx)).get()); + const auto* row_position_col = check_and_get_column<ColumnVector<TYPE_BIGINT>>( + remove_nullable(struct_col->get_column_ptr(row_position_idx)).get()); + + if (!file_path_col || !row_position_col) { + return Status::InternalError( + "__DORIS_ICEBERG_ROWID_COL__ struct fields have incorrect types"); + } + + const ColumnVector<TYPE_INT>* spec_id_col = nullptr; + const ColumnString* partition_data_col = nullptr; + if (spec_id_idx >= 0 && spec_id_idx < static_cast<int>(field_count)) { + spec_id_col = check_and_get_column<ColumnVector<TYPE_INT>>( + remove_nullable(struct_col->get_column_ptr(spec_id_idx)).get()); + if (!spec_id_col) { + return Status::InternalError( + "__DORIS_ICEBERG_ROWID_COL__ partition_spec_id has incorrect type"); + } + } + if (partition_data_idx >= 0 && partition_data_idx < static_cast<int>(field_count)) { + partition_data_col = check_and_get_column<ColumnString>( + remove_nullable(struct_col->get_column_ptr(partition_data_idx)).get()); + if (!partition_data_col) { + return Status::InternalError( + "__DORIS_ICEBERG_ROWID_COL__ partition_data has incorrect type"); + } + } + + // Group by file_path using roaring bitmap + for (size_t i = 0; i < block.rows(); ++i) { + std::string file_path = file_path_col->get_data_at(i).to_string(); + int64_t row_position = row_position_col->get_element(i); + if (row_position < 0) { + return Status::InternalError("Invalid row_position {} in row_id column", row_position); + } + + int32_t partition_spec_id = _partition_spec_id; + std::string partition_data_json = _partition_data_json; + if (spec_id_col != nullptr) { + partition_spec_id = spec_id_col->get_element(i); + } + if (partition_data_col != nullptr) { + partition_data_json = partition_data_col->get_data_at(i).to_string(); + } + + auto [iter, inserted] = file_deletions.emplace( + file_path, IcebergFileDeletion(partition_spec_id, partition_data_json)); + if (!inserted) { + if (iter->second.partition_spec_id != partition_spec_id || + iter->second.partition_data_json != partition_data_json) { + LOG(WARNING) << fmt::format( + "Mismatched partition info for file {}, existing spec_id={}, data={}, " + "new spec_id={}, data={}", + file_path, iter->second.partition_spec_id, iter->second.partition_data_json, + partition_spec_id, partition_data_json); + } + } + iter->second.rows_to_delete.add(static_cast<uint64_t>(row_position)); + } + + return Status::OK(); +} + +Status VIcebergDeleteSink::_write_position_delete_files( + const std::map<std::string, IcebergFileDeletion>& file_deletions) { + constexpr size_t kBatchSize = 4096; + for (const auto& [data_file_path, deletion] : file_deletions) { + if (deletion.rows_to_delete.isEmpty()) { + continue; + } + // Generate unique delete file path + std::string delete_file_path = _generate_delete_file_path(data_file_path); + + // Create delete file writer + auto writer = VIcebergDeleteFileWriterFactory::create_writer( + TFileContent::POSITION_DELETES, delete_file_path, _file_format_type, + _compress_type); + + // Build column names for position delete + std::vector<std::string> column_names = {"file_path", "pos"}; + + if (_position_delete_output_expr_ctxs.empty()) { + RETURN_IF_ERROR(_init_position_delete_output_exprs()); + } + + // Open writer + RETURN_IF_ERROR(writer->open(_state, _state->runtime_profile(), + _position_delete_output_expr_ctxs, column_names, _hadoop_conf, + _file_type, _broker_addresses)); + + // Build block with (file_path, pos) columns + std::vector<int64_t> positions; + positions.reserve(kBatchSize); + for (auto it = deletion.rows_to_delete.begin(); it != deletion.rows_to_delete.end(); ++it) { + positions.push_back(static_cast<int64_t>(*it)); + if (positions.size() >= kBatchSize) { + Block delete_block; + RETURN_IF_ERROR( + _build_position_delete_block(data_file_path, positions, delete_block)); + RETURN_IF_ERROR(writer->write(delete_block)); + positions.clear(); + } + } + if (!positions.empty()) { + Block delete_block; + RETURN_IF_ERROR(_build_position_delete_block(data_file_path, positions, delete_block)); + RETURN_IF_ERROR(writer->write(delete_block)); + } + + // Set partition info on writer before close + writer->set_partition_info(deletion.partition_spec_id, deletion.partition_data_json); + + // Close writer and collect commit data + TIcebergCommitData commit_data; + RETURN_IF_ERROR(writer->close(commit_data)); + + // Set referenced data file path + commit_data.__set_referenced_data_file_path(data_file_path); + + _commit_data_list.push_back(commit_data); + _delete_file_count++; + + VLOG(1) << fmt::format("Written position delete file: path={}, rows={}, referenced_file={}", + delete_file_path, commit_data.row_count, data_file_path); + } + + return Status::OK(); +} + +Status VIcebergDeleteSink::_init_position_delete_output_exprs() { + if (!_position_delete_output_expr_ctxs.empty()) { + return Status::OK(); + } + + std::vector<TExpr> texprs; + texprs.reserve(2); + + std::string empty_string; + TExprNode file_path_node = + create_texpr_node_from(&empty_string, PrimitiveType::TYPE_STRING, 0, 0); + file_path_node.__set_num_children(0); + file_path_node.__set_output_scale(0); + file_path_node.__set_is_nullable(false); + TExpr file_path_expr; + file_path_expr.nodes.emplace_back(std::move(file_path_node)); + texprs.emplace_back(std::move(file_path_expr)); + + int64_t zero = 0; + TExprNode pos_node = create_texpr_node_from(&zero, PrimitiveType::TYPE_BIGINT, 0, 0); + pos_node.__set_num_children(0); + pos_node.__set_output_scale(0); + pos_node.__set_is_nullable(false); + TExpr pos_expr; + pos_expr.nodes.emplace_back(std::move(pos_node)); + texprs.emplace_back(std::move(pos_expr)); + + RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, _position_delete_output_expr_ctxs)); + return Status::OK(); +} + +Status VIcebergDeleteSink::_build_position_delete_block(const std::string& file_path, + const std::vector<int64_t>& positions, + Block& output_block) { + // Create file_path column (repeated for each position) + auto file_path_col = ColumnString::create(); + for (size_t i = 0; i < positions.size(); ++i) { + file_path_col->insert_data(file_path.data(), file_path.size()); + } + + // Create pos column + auto pos_col = ColumnVector<TYPE_BIGINT>::create(); + pos_col->get_data().assign(positions.begin(), positions.end()); + + // Build block + output_block.insert(ColumnWithTypeAndName(std::move(file_path_col), + std::make_shared<DataTypeString>(), "file_path")); + output_block.insert( + ColumnWithTypeAndName(std::move(pos_col), std::make_shared<DataTypeInt64>(), "pos")); + + return Status::OK(); +} + +std::string VIcebergDeleteSink::_get_file_extension() const { + std::string compress_name; + switch (_compress_type) { + case TFileCompressType::SNAPPYBLOCK: { + compress_name = ".snappy"; + break; + } + case TFileCompressType::ZLIB: { + compress_name = ".zlib"; + break; + } + case TFileCompressType::ZSTD: { + compress_name = ".zstd"; + break; + } + default: { + compress_name = ""; + break; + } + } + + std::string file_format_name; + switch (_file_format_type) { + case TFileFormatType::FORMAT_PARQUET: { + file_format_name = ".parquet"; + break; + } + case TFileFormatType::FORMAT_ORC: { + file_format_name = ".orc"; + break; + } + default: { + file_format_name = ""; + break; + } + } + return fmt::format("{}{}", compress_name, file_format_name); +} + +std::string VIcebergDeleteSink::_generate_delete_file_path( + const std::string& referenced_data_file) { Review Comment: **MEDIUM: UUID truncated to 8 characters — collision risk** The generated delete file name uses `uuid.substr(0, 8)` which is only 8 hex characters (32 bits of entropy). Combined with `std::hash % 10000000`, there's a non-trivial collision risk in high-throughput concurrent environments where many delete files are written simultaneously. Consider using the full UUID or at least a significantly longer substring (e.g., 16+ characters). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
