This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 4428db37b3884373482071fc918936b0c080e47c Author: Zoltan Borok-Nagy <[email protected]> AuthorDate: Fri Mar 1 20:49:11 2024 +0100 IMPALA-12860: Invoke validateDataFilesExist for RowDelta operations We must invoke validateDataFilesExist for RowDelta operations (DELETE/ UPDATE/MERGE). Without this a concurrent RewriteFiles (compaction) and RowDelta can corrupt a table. IcebergBufferedDeleteSink now also collects the filenames of the data files that are referenced in the position delete files. It adds them to the DML exec state which is then collected by the Coordinator. The Coordinator passes the file paths to CatalogD which executes Iceberg's RowDelta operation and now invokes validateDataFilesExist() with the file paths. Additionally it also invokes validateDeletedFiles(). This patch set also resolves IMPALA-12640 which is about replacing IcebergDeleteSink with IcebergBufferedDeleteSink, as from now on we use the buffered version for all DML operations that write position delete files. Testing: * adds new stress test with DELETE + UPDATE + OPTIMIZE Change-Id: I4869eb863ff0afe8f691ccf2fd681a92d36b405c Reviewed-on: http://gerrit.cloudera.org:8080/21099 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Gabor Kaszab <[email protected]> --- be/src/exec/CMakeLists.txt | 1 - be/src/exec/iceberg-buffered-delete-sink.cc | 18 ++ be/src/exec/iceberg-buffered-delete-sink.h | 3 + be/src/exec/iceberg-delete-sink-config.cc | 12 +- be/src/exec/iceberg-delete-sink.cc | 245 --------------------- be/src/exec/iceberg-delete-sink.h | 93 -------- be/src/exec/multi-table-sink.cc | 3 +- be/src/runtime/dml-exec-state.cc | 7 + be/src/runtime/dml-exec-state.h | 21 ++ be/src/service/client-request-state.cc | 4 + common/protobuf/control_service.proto | 4 + common/thrift/CatalogService.thrift | 3 + common/thrift/DataSinks.thrift | 4 +- .../apache/impala/analysis/IcebergDeleteImpl.java | 5 +- .../impala/planner/IcebergBufferedDeleteSink.java | 1 - .../apache/impala/planner/IcebergDeleteSink.java | 145 ------------ .../java/org/apache/impala/planner/TableSink.java | 5 +- .../impala/service/IcebergCatalogOpExecutor.java | 8 +- .../queries/PlannerTest/iceberg-v2-delete.test | 88 +++----- tests/stress/test_update_stress.py | 73 ++++-- 20 files changed, 163 insertions(+), 580 deletions(-) diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index e6561045e..a9c5c61de 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -81,7 +81,6 @@ add_library(Exec iceberg-buffered-delete-sink.cc iceberg-delete-builder.cc iceberg-delete-node.cc - iceberg-delete-sink.cc iceberg-delete-sink-base.cc iceberg-delete-sink-config.cc incr-stats-util.cc diff --git a/be/src/exec/iceberg-buffered-delete-sink.cc b/be/src/exec/iceberg-buffered-delete-sink.cc index 4743d4065..b7fdfd08c 100644 --- a/be/src/exec/iceberg-buffered-delete-sink.cc +++ b/be/src/exec/iceberg-buffered-delete-sink.cc @@ -385,9 +385,27 @@ Status IcebergBufferedDeleteSink::FlushFinal(RuntimeState* state) { VLogBufferedRecords(); RETURN_IF_ERROR(VerifyBufferedRecords()); RETURN_IF_ERROR(FlushBufferedRecords(state)); + RegisterDataFilesInDmlExecState(); return Status::OK(); } +void IcebergBufferedDeleteSink::RegisterDataFilesInDmlExecState() { + int capacity = 0; + for (const auto& entry : partitions_to_file_positions_) { + const FilePositions& file_positions = entry.second; + capacity += file_positions.size(); + } + dml_exec_state_.reserveReferencedDataFiles(capacity); + for (const auto& entry : partitions_to_file_positions_) { + const FilePositions& file_positions = entry.second; + for (const auto& file_pos_entry : file_positions) { + const StringValue& sv = file_pos_entry.first; + string filepath(sv.Ptr(), sv.Len()); + dml_exec_state_.addReferencedDataFile(std::move(filepath)); + } + } +} + void IcebergBufferedDeleteSink::Close(RuntimeState* state) { if (closed_) return; SCOPED_TIMER(profile()->total_time_counter()); diff --git a/be/src/exec/iceberg-buffered-delete-sink.h b/be/src/exec/iceberg-buffered-delete-sink.h index fd51bf6c2..bdf3920ae 100644 --- a/be/src/exec/iceberg-buffered-delete-sink.h +++ b/be/src/exec/iceberg-buffered-delete-sink.h @@ -90,6 +90,9 @@ class IcebergBufferedDeleteSink : public IcebergDeleteSinkBase { /// Writes all buffered delete records to position delete files. Status FlushBufferedRecords(RuntimeState* state); + /// Registers the referenced data files in dml_exec_state_ + void RegisterDataFilesInDmlExecState(); + /// Initializes an empty output batch. Status InitializeOutputRowBatch(RowBatch* batch); diff --git a/be/src/exec/iceberg-delete-sink-config.cc b/be/src/exec/iceberg-delete-sink-config.cc index 0ab358915..6cf93e833 100644 --- a/be/src/exec/iceberg-delete-sink-config.cc +++ b/be/src/exec/iceberg-delete-sink-config.cc @@ -20,7 +20,6 @@ #include "common/object-pool.h" #include "common/status.h" #include "exec/iceberg-buffered-delete-sink.h" -#include "exec/iceberg-delete-sink.h" #include "exprs/scalar-expr.h" #include "runtime/mem-pool.h" @@ -28,13 +27,8 @@ namespace impala { DataSink* IcebergDeleteSinkConfig::CreateSink(RuntimeState* state) const { TDataSinkId sink_id = state->fragment().idx; - if (this->tsink_->table_sink.iceberg_delete_sink.is_buffered) { - return state->obj_pool()->Add( - new IcebergBufferedDeleteSink(sink_id, *this, state)); - } else { - return state->obj_pool()->Add( - new IcebergDeleteSink(sink_id, *this, state)); - } + return state->obj_pool()->Add( + new IcebergBufferedDeleteSink(sink_id, *this, state)); } Status IcebergDeleteSinkConfig::Init( @@ -48,4 +42,4 @@ Status IcebergDeleteSinkConfig::Init( return Status::OK(); } -} \ No newline at end of file +} diff --git a/be/src/exec/iceberg-delete-sink.cc b/be/src/exec/iceberg-delete-sink.cc deleted file mode 100644 index 657d75869..000000000 --- a/be/src/exec/iceberg-delete-sink.cc +++ /dev/null @@ -1,245 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/iceberg-delete-sink.h" - -#include "common/object-pool.h" -#include "exec/iceberg-delete-sink-config.h" -#include "exec/parquet/hdfs-parquet-table-writer.h" -#include "exprs/scalar-expr.h" -#include "exprs/scalar-expr-evaluator.h" -#include "runtime/descriptors.h" -#include "runtime/hdfs-fs-cache.h" -#include "runtime/mem-tracker.h" -#include "runtime/row-batch.h" -#include "runtime/runtime-state.h" -#include "util/coding-util.h" -#include "util/hdfs-util.h" -#include "util/impalad-metrics.h" -#include "util/metrics.h" -#include "util/runtime-profile-counters.h" - -#include "common/names.h" - -namespace impala { - -IcebergDeleteSink::IcebergDeleteSink(TDataSinkId sink_id, - const IcebergDeleteSinkConfig& sink_config, RuntimeState* state) : - IcebergDeleteSinkBase(sink_id, sink_config, "IcebergDeleteSink", state) { -} - -Status IcebergDeleteSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { - SCOPED_TIMER(profile()->total_time_counter()); - RETURN_IF_ERROR(IcebergDeleteSinkBase::Prepare(state, parent_mem_tracker)); - return Status::OK(); -} - -Status IcebergDeleteSink::Open(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); - RETURN_IF_ERROR(IcebergDeleteSinkBase::Open(state)); - return Status::OK(); -} - -Status IcebergDeleteSink::Send(RuntimeState* state, RowBatch* batch) { - SCOPED_TIMER(profile()->total_time_counter()); - expr_results_pool_->Clear(); - RETURN_IF_ERROR(state->CheckQueryState()); - // We don't do any work for an empty batch. - if (batch->num_rows() == 0) return Status::OK(); - - RETURN_IF_ERROR(VerifyRowsNotDuplicated(batch)); - - // If there are no partition keys then just pass the whole batch to one partition. - if (dynamic_partition_key_expr_evals_.empty()) { - if (current_partition_.first == nullptr) { - RETURN_IF_ERROR(SetCurrentPartition(state, nullptr, ROOT_PARTITION_KEY)); - } - DCHECK(current_partition_.second.empty()); - RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_partition_.first.get())); - } else { - RETURN_IF_ERROR(WriteClusteredRowBatch(state, batch)); - } - return Status::OK(); -} - -Status IcebergDeleteSink::VerifyRowsNotDuplicated(RowBatch* batch) { - DCHECK_EQ(output_exprs_.size(), 2); - DCHECK_EQ(output_expr_evals_.size(), 2); - - ScalarExpr* filepath_expr = output_exprs_[0]; - ScalarExpr* position_expr = output_exprs_[1]; - DCHECK(filepath_expr->type().IsStringType()); - DCHECK(position_expr->type().IsIntegerType()); - - ScalarExprEvaluator* filepath_eval = output_expr_evals_[0]; - ScalarExprEvaluator* position_eval = output_expr_evals_[1]; - for (int i = 0; i < batch->num_rows(); ++i) { - TupleRow* row = batch->GetRow(i); - StringVal filepath_sv = filepath_eval->GetStringVal(row); - DCHECK(!filepath_sv.is_null); - BigIntVal position_bi = position_eval->GetBigIntVal(row); - DCHECK(!position_bi.is_null); - string filepath(reinterpret_cast<char*>(filepath_sv.ptr), filepath_sv.len); - int64_t position = position_bi.val; - if (prev_file_path_ == filepath && prev_position_ == position) { - return Status(Substitute("Duplicated row in DELETE sink. file_path='$0', pos='$1'. " - "If this is coming from an UPDATE statement with a JOIN, please check if there " - "multiple matches in the JOIN condition.", filepath, position)); - } - prev_file_path_ = filepath; - prev_position_ = position; - } - return Status::OK(); -} - -inline Status IcebergDeleteSink::SetCurrentPartition(RuntimeState* state, - const TupleRow* row, const string& key) { - DCHECK(row != nullptr || key == ROOT_PARTITION_KEY); - if (current_partition_.first != nullptr && - key == current_clustered_partition_key_) { - return Status::OK(); - } - - current_partition_.first.reset(new OutputPartition()); - current_partition_.second.clear(); - // Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/" - // etc. - RETURN_IF_ERROR(ConstructPartitionInfo(row, current_partition_.first.get())); - Status status = InitOutputPartition(state, *prototype_partition_, - current_partition_.first.get(), false); - if (!status.ok()) { - // We failed to create the output partition successfully. Clean it up now. - if (current_partition_.first->writer != nullptr) { - current_partition_.first->writer->Close(); - } - return status; - } - - // Save the partition name so that the coordinator can create the partition - // directory structure if needed. - state->dml_exec_state()->AddPartition( - current_partition_.first->partition_name, prototype_partition_->id(), - &table_desc_->hdfs_base_dir(), - nullptr); - return Status::OK(); -} - -Status IcebergDeleteSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) { - DCHECK_GT(batch->num_rows(), 0); - DCHECK_EQ(partition_key_expr_evals_.size(), 2); - DCHECK(!dynamic_partition_key_expr_evals_.empty()); - - // Initialize the clustered partition and key. - if (current_partition_.first == nullptr) { - TupleRow* current_row = batch->GetRow(0); - GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, - ¤t_clustered_partition_key_); - RETURN_IF_ERROR(SetCurrentPartition(state, current_row, - current_clustered_partition_key_)); - } - - // Compare the last row of the batch to the last current partition key. If they match, - // then all the rows in the batch have the same key and can be written as a whole. - string last_row_key; - GetHashTblKey(batch->GetRow(batch->num_rows() - 1), - dynamic_partition_key_expr_evals_, &last_row_key); - if (last_row_key == current_clustered_partition_key_) { - DCHECK(current_partition_.second.empty()); - RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_partition_.first.get())); - return Status::OK(); - } - - // Not all rows in this batch match the previously written partition key, so we process - // them individually. - for (int i = 0; i < batch->num_rows(); ++i) { - TupleRow* current_row = batch->GetRow(i); - - string key; - GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, &key); - - if (current_clustered_partition_key_ != key) { - DCHECK(current_partition_.first->writer != nullptr); - // Done with previous partition - write rows and close. - if (!current_partition_.second.empty()) { - RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_partition_.first.get(), - current_partition_.second)); - current_partition_.second.clear(); - } - RETURN_IF_ERROR(FinalizePartitionFile(state, - current_partition_.first.get(), /*is_delete=*/true)); - if (current_partition_.first->writer.get() != nullptr) { - current_partition_.first->writer->Close(); - } - RETURN_IF_ERROR(SetCurrentPartition(state, current_row, key)); - current_clustered_partition_key_ = std::move(key); - } -#ifdef DEBUG - string debug_row_key; - GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, &debug_row_key); - DCHECK_EQ(current_clustered_partition_key_, debug_row_key); -#endif - DCHECK(current_partition_.first->writer != nullptr); - current_partition_.second.push_back(i); - } - // Write final set of rows to the partition but keep its file open. - RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_partition_.first.get(), - current_partition_.second)); - current_partition_.second.clear(); - return Status::OK(); -} - -Status IcebergDeleteSink::FlushFinal(RuntimeState* state) { - DCHECK(!closed_); - SCOPED_TIMER(profile()->total_time_counter()); - - if (current_partition_.first != nullptr) { - RETURN_IF_ERROR(FinalizePartitionFile(state, current_partition_.first.get(), - /*is_delete=*/true)); - } - return Status::OK(); -} - -void IcebergDeleteSink::Close(RuntimeState* state) { - if (closed_) return; - SCOPED_TIMER(profile()->total_time_counter()); - - if (current_partition_.first != nullptr) { - if (current_partition_.first->writer != nullptr) { - current_partition_.first->writer->Close(); - } - Status close_status = ClosePartitionFile(state, current_partition_.first.get()); - if (!close_status.ok()) state->LogError(close_status.msg()); - } - - current_partition_.first.reset(); - IcebergDeleteSinkBase::Close(state); - DCHECK(closed_); -} - -string IcebergDeleteSink::DebugString() const { - stringstream out; - out << "IcebergDeleteSink(" - << " table_desc=" << table_desc_->DebugString() - << " output_exprs=" << ScalarExpr::DebugString(output_exprs_); - if (!partition_key_exprs_.empty()) { - out << " partition_key_exprs=" << ScalarExpr::DebugString(partition_key_exprs_); - } - out << ")"; - return out.str(); -} - -} // namespace impala diff --git a/be/src/exec/iceberg-delete-sink.h b/be/src/exec/iceberg-delete-sink.h deleted file mode 100644 index 9be88d294..000000000 --- a/be/src/exec/iceberg-delete-sink.h +++ /dev/null @@ -1,93 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include "exec/iceberg-delete-sink-base.h" -#include "exec/output-partition.h" -#include "exec/table-sink-base.h" - -#include <unordered_map> - -namespace impala { - -class Expr; -class IcebergDeleteSinkConfig; -class TupleDescriptor; -class TupleRow; -class MemTracker; - -class IcebergDeleteSink : public IcebergDeleteSinkBase { - public: - IcebergDeleteSink(TDataSinkId sink_id, const IcebergDeleteSinkConfig& sink_config, - RuntimeState* state); - - /// Prepares output_exprs and partition_key_exprs, and connects to HDFS. - Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override; - - /// Opens output_exprs and partition_key_exprs. - Status Open(RuntimeState* state) override; - - /// Append all rows in batch to position delete files. It is assumed that - /// that rows are ordered by partitions, filepaths, and positions. - Status Send(RuntimeState* state, RowBatch* batch) override; - - /// Finalize any open files. - /// TODO: IMPALA-2988: Move calls to functions that can fail in Close() to FlushFinal() - Status FlushFinal(RuntimeState* state) override; - - /// Closes writers, output_exprs and partition_key_exprs and releases resources. - void Close(RuntimeState* state) override; - - std::string DebugString() const override; - - private: - /// Verifies that the row batch does not contain duplicated rows. This can only happen - /// in the context of UPDATE FROM statements when we are updating a table based on - /// another table, e.g.: - /// UPDATE t SET t.x = s.x FROM ice_t t, source_tbl s where t.id = s.id; - /// Now, if 'source_tbl' has duplicate rows then the JOIN operator would produce - /// multiple matches for the same row, and we would insert them to the table. - /// Therefore, we should always raise an error if we find duplicated rows (i.e rows - /// having the same filepath + position), because that would corrupt the table data - /// and the delete files as well. - /// For a case where deduplication is not possible at the sink level, see the comment - /// in IcebergUpdateImpl.buildAndValidateSelectExprs() in the Frontend Java code. - Status VerifyRowsNotDuplicated(RowBatch* batch); - - /// Maps all rows in 'batch' to partitions and appends them to their temporary Hdfs - /// files. The input must be ordered by the partition key expressions. - Status WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) WARN_UNUSED_RESULT; - - /// Sets and initializes the 'current_partition_' based on key. For unpartitioned tables - /// it is only invoked once to initialize the only output partition. - /// For partitioned tables the rows are clustered based on partition data, i.e. when the - /// key changes we initialize a new output partition. - Status SetCurrentPartition(RuntimeState* state, const TupleRow* row, - const std::string& key) WARN_UNUSED_RESULT; - - /// The sink writes partitions one-by-one. - PartitionPair current_partition_; - - /// Variables necessary for validating that row batches don't contain duplicates. - std::string prev_file_path_; - int64_t prev_position_ = -1; -}; - -} - - diff --git a/be/src/exec/multi-table-sink.cc b/be/src/exec/multi-table-sink.cc index 3dd4b44c0..c37e9cf2d 100644 --- a/be/src/exec/multi-table-sink.cc +++ b/be/src/exec/multi-table-sink.cc @@ -17,7 +17,6 @@ #include "common/object-pool.h" #include "exec/hdfs-table-sink.h" -#include "exec/iceberg-delete-sink.h" #include "exec/multi-table-sink.h" #include "runtime/fragment-state.h" #include "runtime/runtime-state.h" @@ -95,4 +94,4 @@ void MultiTableSink::Close(RuntimeState* state) { DCHECK(closed_); } -} \ No newline at end of file +} diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc index acf88ccdf..49e7f05bc 100644 --- a/be/src/runtime/dml-exec-state.cc +++ b/be/src/runtime/dml-exec-state.cc @@ -133,6 +133,10 @@ void DmlExecState::Update(const DmlExecStatusPB& dml_exec_status) { files_to_move_[file.staging_path()] = file.final_path(); } } + data_files_referenced_by_position_deletes_.insert( + data_files_referenced_by_position_deletes_.end(), + dml_exec_status.data_files_referenced_by_position_deletes().begin(), + dml_exec_status.data_files_referenced_by_position_deletes().end()); } uint64_t DmlExecState::GetKuduLatestObservedTimestamp() { @@ -436,6 +440,9 @@ void DmlExecState::ToProto(DmlExecStatusPB* dml_status) { for (const PartitionStatusMap::value_type& part : per_partition_status_) { (*dml_status->mutable_per_partition_status())[part.first] = part.second; } + *dml_status->mutable_data_files_referenced_by_position_deletes() = + {data_files_referenced_by_position_deletes_.begin(), + data_files_referenced_by_position_deletes_.end()}; } void DmlExecState::ToTDmlResult(TDmlResult* dml_result) { diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h index da9f4ff3d..9c6a4e87f 100644 --- a/be/src/runtime/dml-exec-state.h +++ b/be/src/runtime/dml-exec-state.h @@ -20,6 +20,7 @@ #include <map> #include <mutex> #include <string> +#include <vector> #include <boost/unordered_map.hpp> #include "common/hdfs.h" @@ -131,6 +132,22 @@ class DmlExecState { // Encodes delete file list info in flatbuffer format expected by Iceberg API. std::vector<std::string> CreateIcebergDeleteFilesVector(); + // Returns vector of Iceberg data files referenced by position delete records by + // this DML statement. + const std::vector<std::string>& DataFilesReferencedByPositionDeletes() const { + return data_files_referenced_by_position_deletes_; + } + + // Reserves capacity for 'data_files_referenced_by_position_deletes_'. + void reserveReferencedDataFiles(int capacity) { + data_files_referenced_by_position_deletes_.reserve(capacity); + } + + // Adds file_path to the list of data files referenced by position delete records. + void addReferencedDataFile(std::string&& file_path) { + data_files_referenced_by_position_deletes_.emplace_back(std::move(file_path)); + } + private: /// Auxiliary function used by 'AddCreatedFile' and 'AddCreatedDeleteFile'. void AddFileAux(const OutputPartition& partition, bool is_iceberg, @@ -152,6 +169,10 @@ class DmlExecState { typedef std::map<std::string, std::string> FileMoveMap; FileMoveMap files_to_move_; + /// In case of Iceberg modify statements it contains the data files referenced + /// by position delete records. + std::vector<std::string> data_files_referenced_by_position_deletes_; + /// Determines what the permissions of directories created by INSERT statements should /// be if permission inheritance is enabled. Populates a map from all prefixes of /// 'path_str' (including the full path itself) which is a path in Hdfs, to pairs diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index d68590125..26f6c1139 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -1630,12 +1630,16 @@ bool ClientRequestState::CreateIcebergCatalogOps( } else if (ice_finalize_params.operation == TIcebergOperation::DELETE) { cat_ice_op->__set_iceberg_delete_files_fb( dml_exec_state->CreateIcebergDeleteFilesVector()); + cat_ice_op->__set_data_files_referenced_by_position_deletes( + dml_exec_state->DataFilesReferencedByPositionDeletes()); if (cat_ice_op->iceberg_delete_files_fb.empty()) update_catalog = false; } else if (ice_finalize_params.operation == TIcebergOperation::UPDATE) { cat_ice_op->__set_iceberg_data_files_fb( dml_exec_state->CreateIcebergDataFilesVector()); cat_ice_op->__set_iceberg_delete_files_fb( dml_exec_state->CreateIcebergDeleteFilesVector()); + cat_ice_op->__set_data_files_referenced_by_position_deletes( + dml_exec_state->DataFilesReferencedByPositionDeletes()); if (cat_ice_op->iceberg_delete_files_fb.empty()) { DCHECK(cat_ice_op->iceberg_data_files_fb.empty()); update_catalog = false; diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto index 78d4222ab..02933b6f2 100644 --- a/common/protobuf/control_service.proto +++ b/common/protobuf/control_service.proto @@ -109,6 +109,10 @@ message DmlExecStatusPB { // root's key in an unpartitioned table being ROOT_PARTITION_KEY. // The target table name is recorded in the corresponding TQueryExecRequest map<string, DmlPartitionStatusPB> per_partition_status = 1; + + // In case of Iceberg modify statements it contains the data files referenced + // by position delete records. + repeated string data_files_referenced_by_position_deletes = 2; } // Error message exchange format diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index 1e4e9f5ea..e8d2f7647 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -244,6 +244,9 @@ struct TIcebergOperationParam { // The snapshot id when the operation was started 4: optional i64 initial_snapshot_id; + + // The data files referenced by the position delete files. + 7: optional list<string> data_files_referenced_by_position_deletes } // Per-partion info needed by Catalog to handle an INSERT. diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift index 90ec3e29a..3033ab595 100644 --- a/common/thrift/DataSinks.thrift +++ b/common/thrift/DataSinks.thrift @@ -106,13 +106,11 @@ struct THdfsTableSink { } // Structure to encapsulate specific options that are passed down to the -// IcebergDeleteSink. +// IcebergBufferedDeleteSink. struct TIcebergDeleteSink { // Partition expressions of this sink. In case of Iceberg DELETEs these are the // partition spec id and the serialized partition data. 1: required list<Exprs.TExpr> partition_key_exprs - // True if we are using the buffered delete sink. - 2: required bool is_buffered = false } // Structure to encapsulate specific options that are passed down to the KuduTableSink diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java index 6bc51d66f..d1138d00f 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java @@ -23,7 +23,7 @@ import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.Pair; import org.apache.impala.planner.DataSink; -import org.apache.impala.planner.IcebergDeleteSink; +import org.apache.impala.planner.IcebergBufferedDeleteSink; import org.apache.impala.planner.TableSink; import org.apache.impala.thrift.TSortingOrder; @@ -65,7 +65,6 @@ public class IcebergDeleteImpl extends IcebergModifyImpl { deleteResultExprs_ = getDeleteResultExprs(analyzer); selectList.addAll(ExprUtil.exprsAsSelectList(deletePartitionKeyExprs_)); selectList.addAll(ExprUtil.exprsAsSelectList(deleteResultExprs_)); - sortExprs_.addAll(deleteResultExprs_); } @Override @@ -80,7 +79,7 @@ public class IcebergDeleteImpl extends IcebergModifyImpl { @Override public DataSink createDataSink() { Preconditions.checkState(modifyStmt_.table_ instanceof FeIcebergTable); - return new IcebergDeleteSink(icePosDelTable_, deletePartitionKeyExprs_, + return new IcebergBufferedDeleteSink(icePosDelTable_, deletePartitionKeyExprs_, deleteResultExprs_); } } \ No newline at end of file diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java b/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java index fda1f11be..660f84c33 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java @@ -113,7 +113,6 @@ public class IcebergBufferedDeleteSink extends TableSink { protected void toThriftImpl(TDataSink tsink) { TIcebergDeleteSink icebergDeleteSink = new TIcebergDeleteSink(); icebergDeleteSink.setPartition_key_exprs(Expr.treesToThrift(partitionKeyExprs_)); - icebergDeleteSink.setIs_buffered(true); TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID, TTableSinkType.HDFS, sinkOp_.toThrift()); tTableSink.iceberg_delete_sink = icebergDeleteSink; diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java deleted file mode 100644 index 2e2387ddd..000000000 --- a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java +++ /dev/null @@ -1,145 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.impala.planner; - -import java.util.List; - -import org.apache.impala.analysis.DescriptorTable; -import org.apache.impala.analysis.Expr; -import org.apache.impala.catalog.FeIcebergTable; -import org.apache.impala.common.ByteUnits; -import org.apache.impala.thrift.TDataSink; -import org.apache.impala.thrift.TDataSinkType; -import org.apache.impala.thrift.TExplainLevel; -import org.apache.impala.thrift.TIcebergDeleteSink; -import org.apache.impala.thrift.TQueryOptions; -import org.apache.impala.thrift.TTableSink; -import org.apache.impala.thrift.TTableSinkType; - -/** - * Sink for deleting data from Iceberg tables. Impala deletes data via 'merge-on-read' - * strategy, which means it writes Iceberg position delete files. These files contain - * information (file_path, position) about the deleted rows. Query engines reading from - * an Iceberg table need to exclude the deleted rows from the result of the table scan. - * Impala does this by doing an ANTI JOIN between data files and delete files. - */ -public class IcebergDeleteSink extends TableSink { - final private int deleteTableId_; - - // Exprs for computing the output partition(s). - protected final List<Expr> partitionKeyExprs_; - - public IcebergDeleteSink(FeIcebergTable targetTable, List<Expr> partitionKeyExprs, - List<Expr> outputExprs) { - this(targetTable, partitionKeyExprs, outputExprs, 0); - } - - public IcebergDeleteSink(FeIcebergTable targetTable, List<Expr> partitionKeyExprs, - List<Expr> outputExprs, int deleteTableId) { - super(targetTable, Op.DELETE, outputExprs); - partitionKeyExprs_ = partitionKeyExprs; - deleteTableId_ = deleteTableId; - } - - @Override - public void computeProcessingCost(TQueryOptions queryOptions) { - // The processing cost to export rows. - processingCost_ = computeDefaultProcessingCost(); - } - - @Override - public void computeResourceProfile(TQueryOptions queryOptions) { - PlanNode inputNode = fragment_.getPlanRoot(); - final int numInstances = fragment_.getNumInstances(); - // Input is clustered, so it produces a single partition at a time. - final long numBufferedPartitionsPerInstance = 1; - // For regular Parquet files we estimate 1GB memory consumption which is already - // a conservative, i.e. probably too high memory estimate. - // Writing out position delete files means we are writing filenames and positions - // per partition. So assuming 0.5 GB per position delete file writer can be still - // considered a very conservative estimate. - final long perPartitionMemReq = 512L * ByteUnits.MEGABYTE; - - long perInstanceMemEstimate; - // The estimate is based purely on the per-partition mem req if the input cardinality_ - // or the avg row size is unknown. - if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) { - perInstanceMemEstimate = numBufferedPartitionsPerInstance * perPartitionMemReq; - } else { - // The per-partition estimate may be higher than the memory required to buffer - // the entire input data. - long perInstanceInputCardinality = - Math.max(1L, inputNode.getCardinality() / numInstances); - long perInstanceInputBytes = - (long) Math.ceil(perInstanceInputCardinality * inputNode.getAvgRowSize()); - long perInstanceMemReq = - PlanNode.checkedMultiply(numBufferedPartitionsPerInstance, perPartitionMemReq); - perInstanceMemEstimate = Math.min(perInstanceInputBytes, perInstanceMemReq); - } - resourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate); - } - - @Override - public void appendSinkExplainString(String prefix, String detailPrefix, - TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder output) { - output.append(String.format("%sDELETE FROM ICEBERG [%s]\n", prefix, - targetTable_.getFullName())); - if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { - output.append(detailPrefix + "output exprs: ") - .append(Expr.getExplainString(outputExprs_, explainLevel) + "\n"); - if (!partitionKeyExprs_.isEmpty()) { - output.append(detailPrefix + "partition keys: ") - .append(Expr.getExplainString(partitionKeyExprs_, explainLevel) + "\n"); - } - } - } - - @Override - protected String getLabel() { - return "ICEBERG DELETER"; - } - - @Override - protected void toThriftImpl(TDataSink tsink) { - TIcebergDeleteSink icebergDeleteSink = new TIcebergDeleteSink(); - icebergDeleteSink.setPartition_key_exprs(Expr.treesToThrift(partitionKeyExprs_)); - TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID, - TTableSinkType.HDFS, sinkOp_.toThrift()); - tTableSink.iceberg_delete_sink = icebergDeleteSink; - tTableSink.setTarget_table_id(deleteTableId_); - tsink.table_sink = tTableSink; - tsink.output_exprs = Expr.treesToThrift(outputExprs_); - } - - @Override - protected TDataSinkType getSinkType() { - return TDataSinkType.TABLE_SINK; - } - - @Override - public void collectExprs(List<Expr> exprs) { - exprs.addAll(partitionKeyExprs_); - exprs.addAll(outputExprs_); - } - - @Override - public void computeRowConsumptionAndProductionToCost() { - super.computeRowConsumptionAndProductionToCost(); - fragment_.setFixedInstanceCount(fragment_.getNumInstances()); - } -} diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java index 7c869f0da..5da64cba3 100644 --- a/fe/src/main/java/org/apache/impala/planner/TableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java @@ -133,11 +133,8 @@ public abstract class TableSink extends DataSink { if (sinkAction == Op.INSERT) { return new HdfsTableSink(table, partitionKeyExprs,outputExprs, overwrite, inputIsClustered, sortProperties, writeId, maxTableSinks, isResultSink); - } else if (sinkAction == Op.DELETE) { - return new IcebergDeleteSink((FeIcebergTable)table, partitionKeyExprs, - outputExprs, maxTableSinks); } else { - // We don't support any other sink actions yet. + // Other SINK actions are either not supported or created directly. Preconditions.checkState(false); } } diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java index 05f1d4e2e..f126bf16e 100644 --- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java @@ -375,6 +375,9 @@ public class IcebergCatalogOpExecutor { // affected by this DELETE operation. rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id()); rowDelta.validateNoConflictingDataFiles(); + rowDelta.validateDataFilesExist( + icebergOp.getData_files_referenced_by_position_deletes()); + rowDelta.validateDeletedFiles(); rowDelta.commit(); } catch (ValidationException e) { throw new ImpalaRuntimeException(e.getMessage(), e); @@ -403,6 +406,9 @@ public class IcebergCatalogOpExecutor { rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id()); rowDelta.validateNoConflictingDataFiles(); rowDelta.validateNoConflictingDeleteFiles(); + rowDelta.validateDataFilesExist( + icebergOp.getData_files_referenced_by_position_deletes()); + rowDelta.validateDeletedFiles(); rowDelta.commit(); } catch (ValidationException e) { throw new ImpalaRuntimeException(e.getMessage(), e); @@ -566,4 +572,4 @@ public class IcebergCatalogOpExecutor { String.valueOf(version)); updateProps.commit(); } -} \ No newline at end of file +} diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test index c75cdaea0..a0b8aceed 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test @@ -1,10 +1,6 @@ DELETE FROM iceberg_v2_no_deletes where i = 3 ---- PLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE] -| -01:SORT -| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST -| row-size=20B cardinality=1 +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE] | 00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes] HDFS partitions=1/1 files=1 size=625B @@ -12,11 +8,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE] Iceberg snapshot id: 728158873687794725 row-size=24B cardinality=1 ---- DISTRIBUTEDPLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE] -| -01:SORT -| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST -| row-size=20B cardinality=1 +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE] | 00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes] HDFS partitions=1/1 files=1 size=625B @@ -26,11 +18,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE] ==== DELETE FROM iceberg_v2_delete_positional where id = 15 ---- PLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] -| -03:SORT -| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST -| row-size=20B cardinality=1 +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] | 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN] | row-size=28B cardinality=1 @@ -46,11 +34,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE Iceberg snapshot id: 5725822353600261755 row-size=28B cardinality=1 ---- DISTRIBUTEDPLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] -| -04:SORT -| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST -| row-size=20B cardinality=1 +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] | 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=28B cardinality=1 @@ -71,11 +55,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE DELETE FROM iceberg_v2_delete_positional where id = (select min(id) from iceberg_v2_delete_positional) ---- PLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] -| -08:SORT -| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST -| row-size=20B cardinality=2 +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] | 07:HASH JOIN [LEFT SEMI JOIN] | hash predicates: id = min(id) @@ -113,11 +93,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE Iceberg snapshot id: 5725822353600261755 row-size=28B cardinality=3 ---- DISTRIBUTEDPLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] -| -13:SORT -| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST -| row-size=20B cardinality=2 +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] | 07:HASH JOIN [LEFT SEMI JOIN, BROADCAST] | hash predicates: id = min(id) @@ -169,11 +145,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE ==== DELETE FROM iceberg_v2_delete_positional WHERE FILE__POSITION = id ---- PLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] -| -03:SORT -| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST -| row-size=20B cardinality=1 +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] | 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN] | row-size=28B cardinality=1 @@ -189,11 +161,7 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE Iceberg snapshot id: 5725822353600261755 row-size=28B cardinality=1 ---- DISTRIBUTEDPLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] -| -04:SORT -| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST -| row-size=20B cardinality=1 +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] | 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] | row-size=28B cardinality=1 @@ -213,10 +181,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DE ==== delete from iceberg_v2_partitioned_position_deletes where id = 20; ---- PLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | 03:SORT -| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST | row-size=36B cardinality=1 | 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN] @@ -233,10 +201,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- Iceberg snapshot id: 8885697082976537578 row-size=40B cardinality=2 ---- DISTRIBUTEDPLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | 05:SORT -| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST | row-size=36B cardinality=1 | 04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)] @@ -259,10 +227,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- ==== delete from iceberg_v2_partitioned_position_deletes where action = 'click'; ---- PLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | 03:SORT -| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST | row-size=36B cardinality=3 | 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN] @@ -279,10 +247,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- skipped Iceberg predicates: action = 'click' row-size=36B cardinality=6 ---- DISTRIBUTEDPLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | 04:SORT -| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST | row-size=36B cardinality=3 | 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED] @@ -303,10 +271,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- ==== delete from iceberg_v2_partitioned_position_deletes where user like 'A%'; ---- PLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | 03:SORT -| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST | row-size=36B cardinality=1 | 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN] @@ -323,10 +291,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- Iceberg snapshot id: 8885697082976537578 row-size=48B cardinality=2 ---- DISTRIBUTEDPLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | 05:SORT -| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST | row-size=36B cardinality=1 | 04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)] @@ -350,10 +318,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- delete from iceberg_v2_partitioned_position_deletes where id = (select max(id) from iceberg_v2_delete_positional); ---- PLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | 08:SORT -| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST | row-size=36B cardinality=10 | 07:HASH JOIN [LEFT SEMI JOIN] @@ -392,10 +360,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- Iceberg snapshot id: 8885697082976537578 row-size=40B cardinality=20 ---- DISTRIBUTEDPLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | 14:SORT -| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST | row-size=36B cardinality=10 | 13:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)] @@ -450,10 +418,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- ==== DELETE FROM iceberg_v2_partitioned_position_deletes WHERE FILE__POSITION = id ---- PLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | 03:SORT -| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST | row-size=36B cardinality=1 | 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN] @@ -470,10 +438,10 @@ DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes- Iceberg snapshot id: 8885697082976537578 row-size=40B cardinality=2 ---- DISTRIBUTEDPLAN -DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] +BUFFERED DELETE FROM ICEBERG [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE] | 05:SORT -| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| order by: partition__spec__id ASC NULLS LAST, iceberg__partition__serialized ASC NULLS LAST | row-size=36B cardinality=1 | 04:EXCHANGE [HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)] diff --git a/tests/stress/test_update_stress.py b/tests/stress/test_update_stress.py index 9cc767746..35f380e31 100644 --- a/tests/stress/test_update_stress.py +++ b/tests/stress/test_update_stress.py @@ -149,7 +149,7 @@ class TestIcebergConcurrentUpdateStress(ImpalaTestSuite): run_tasks([updater_a, updater_b, updater_c] + checkers) -class TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite): +class TestIcebergConcurrentOperations(ImpalaTestSuite): """This test checks that concurrent DELETE and UPDATE operations leave the table in a consistent state.""" @@ -159,12 +159,12 @@ class TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite): @classmethod def add_test_dimensions(cls): - super(TestIcebergConcurrentDeletesAndUpdates, cls).add_test_dimensions() + super(TestIcebergConcurrentOperations, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_constraint( lambda v: (v.get_value('table_format').file_format == 'parquet' and v.get_value('table_format').compression_codec == 'snappy')) - def _impala_role_concurrent_deleter(self, tbl_name, flag, num_rows): + def _impala_role_concurrent_deleter(self, tbl_name, all_rows_deleted, num_rows): """Deletes every row from the table one by one.""" target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) @@ -179,15 +179,15 @@ class TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite): # Exceptions are expected due to concurrent operations. print(str(e)) time.sleep(random.random()) - flag.value = 1 + all_rows_deleted.value = 1 impalad_client.close() - def _impala_role_concurrent_writer(self, tbl_name, flag): + def _impala_role_concurrent_writer(self, tbl_name, all_rows_deleted): """Updates every row in the table in a loop.""" target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) impalad_client.set_configuration_option("SYNC_DDL", "true") - while flag.value != 1: + while all_rows_deleted.value != 1: try: impalad_client.execute( "update {0} set j = j + 1".format(tbl_name)) @@ -197,7 +197,21 @@ class TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite): time.sleep(random.random()) impalad_client.close() - def _impala_role_concurrent_checker(self, tbl_name, flag, num_rows): + def _impala_role_concurrent_optimizer(self, tbl_name, all_rows_deleted): + """Optimizes the table in a loop.""" + target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) + impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + impalad_client.set_configuration_option("SYNC_DDL", "true") + while all_rows_deleted.value != 1: + try: + impalad_client.execute("optimize table {0}".format(tbl_name)) + except Exception as e: + # Exceptions are expected due to concurrent operations. + print(str(e)) + time.sleep(random.random()) + impalad_client.close() + + def _impala_role_concurrent_checker(self, tbl_name, all_rows_deleted, num_rows): """Checks if the table's invariant is true. The invariant is that we have a consecutive range of 'id's starting from N to num_rows - 1. And 'j's are equal.""" def verify_result_set(result): @@ -214,7 +228,7 @@ class TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite): target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) - while flag.value != 1: + while all_rows_deleted.value != 1: result = impalad_client.execute("select * from %s order by id" % tbl_name) verify_result_set(result) time.sleep(random.random()) @@ -232,19 +246,52 @@ class TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite): stored as iceberg tblproperties('format-version'='2')""".format(tbl_name,)) + num_rows = 10 + for i in range(num_rows): + self.client.execute("insert into {} values ({}, 0)".format(tbl_name, i)) + + all_rows_deleted = Value('i', 0) + deleter = Task(self._impala_role_concurrent_deleter, tbl_name, all_rows_deleted, + num_rows) + updater = Task(self._impala_role_concurrent_writer, tbl_name, all_rows_deleted) + checker = Task(self._impala_role_concurrent_checker, tbl_name, all_rows_deleted, + num_rows) + run_tasks([deleter, updater, checker]) + + result = self.client.execute("select count(*) from {}".format(tbl_name)) + assert result.data == ['0'] + + @pytest.mark.stress + @UniqueDatabase.parametrize(sync_ddl=True) + def test_iceberg_deletes_and_updates_and_optimize(self, unique_database): + """Issues DELETE and UPDATE statements in parallel in a way that some + invariants must be true when a spectator process inspects the table. + An optimizer thread also invokes OPTMIZE regularly on the table.""" + + tbl_name = "%s.test_concurrent_write_and_optimize" % unique_database + self.client.set_configuration_option("SYNC_DDL", "true") + self.client.execute("""create table {0} (id int, j bigint) + stored as iceberg + tblproperties('format-version'='2')""".format(tbl_name,)) + num_rows = 10 values_str = "" + # Prepare INSERT statement of 'num_rows' records. for i in range(num_rows): values_str += "({}, 0)".format(i) if i != num_rows - 1: values_str += ", " self.client.execute("insert into {} values {}".format(tbl_name, values_str)) - flag = Value('i', 0) - deleter = Task(self._impala_role_concurrent_deleter, tbl_name, flag, num_rows) - updater = Task(self._impala_role_concurrent_writer, tbl_name, flag) - checker = Task(self._impala_role_concurrent_checker, tbl_name, flag, num_rows) - run_tasks([deleter, updater, checker]) + all_rows_deleted = Value('i', 0) + deleter = Task(self._impala_role_concurrent_deleter, tbl_name, all_rows_deleted, + num_rows) + updater = Task(self._impala_role_concurrent_writer, tbl_name, all_rows_deleted) + optimizer = Task(self._impala_role_concurrent_optimizer, tbl_name, + all_rows_deleted) + checker = Task(self._impala_role_concurrent_checker, tbl_name, all_rows_deleted, + num_rows) + run_tasks([deleter, updater, optimizer, checker]) result = self.client.execute("select count(*) from {}".format(tbl_name)) assert result.data == ['0']
