This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push: new c57921225 IMPALA-11877: (part 1) Add support for DELETE statements for UNPARTITIONED Iceberg tables c57921225 is described below commit c57921225113ef06d2d4358652412f6f5f7f15b6 Author: Zoltan Borok-Nagy <borokna...@cloudera.com> AuthorDate: Mon Apr 3 14:36:18 2023 +0200 IMPALA-11877: (part 1) Add support for DELETE statements for UNPARTITIONED Iceberg tables This patch adds support for DELETE statements on unpartitioned Iceberg tables. Impala uses the 'merge-on-read' mode with position delete files. The patch reuses the existing IcebergPositionDeleteTable as the target table of the DELETE statements, because this table already has the same schema as position delete files, even with correct Iceberg field IDs. The patch basically rewrites DELETE statements to INSERT statements, e.g.: from: DELETE FROM ice_t WHERE id = 42; to: INSERT INTO ice_t-POSITION-DELETE SELECT INPUT__FILE__NAME, FILE__POSITION FROM ice_t WHERE id = 42; Position delete files need to be ordered by (file_path, pos), so we add an extra SORT node before the table sink operator. In the backend the patch adds a new table sink operator, the IcebergDeleteSink. It writes the incoming rows (file_path, position) to delete files. It reuses a lot of code from HdfsTableSink, so this patch moves the common code to the new common base class: TableSinkBase. The coordinator then collects the written delete files and invokes UpdateCatalog to finalize the DELETE statement. The Catalog then uses Iceberg APIs to create a new snapshot with the created delete files. It also validates that there was no conflicting data files written since the operation started. Testing: * added planer test * e2e tests * interop test between Impala and Hive Change-Id: Ic933b2295abe54b46d2a736961219988ff42915b Reviewed-on: http://gerrit.cloudera.org:8080/19776 Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Reviewed-by: Gabor Kaszab <gaborkas...@cloudera.com> --- be/src/exec/CMakeLists.txt | 2 + be/src/exec/data-sink.cc | 8 +- be/src/exec/file-metadata-utils.cc | 1 - be/src/exec/hdfs-table-sink.cc | 269 +------------------ be/src/exec/hdfs-table-sink.h | 118 ++------- be/src/exec/hdfs-table-writer.cc | 2 +- be/src/exec/hdfs-table-writer.h | 6 +- be/src/exec/iceberg-delete-sink.cc | 165 ++++++++++++ be/src/exec/iceberg-delete-sink.h | 89 +++++++ be/src/exec/parquet/hdfs-parquet-table-writer.cc | 8 +- be/src/exec/parquet/hdfs-parquet-table-writer.h | 4 +- be/src/exec/table-sink-base.cc | 293 +++++++++++++++++++++ be/src/exec/table-sink-base.h | 144 ++++++++++ be/src/runtime/coordinator.cc | 2 +- be/src/runtime/descriptors.cc | 1 + be/src/service/client-request-state.cc | 23 +- common/thrift/CatalogObjects.thrift | 2 +- common/thrift/CatalogService.thrift | 8 +- common/thrift/DataSinks.thrift | 4 + common/thrift/Query.thrift | 20 +- common/thrift/Types.thrift | 5 + .../org/apache/impala/analysis/DeleteStmt.java | 3 +- .../impala/analysis/IcebergPartitionSpec.java | 8 +- .../org/apache/impala/analysis/ModifyStmt.java | 181 +++++++++++-- .../java/org/apache/impala/analysis/SlotRef.java | 11 +- .../org/apache/impala/analysis/UpdateStmt.java | 3 +- .../org/apache/impala/catalog/FeIcebergTable.java | 39 +++ .../impala/catalog/IcebergPositionDeleteTable.java | 10 +- .../org/apache/impala/catalog/IcebergTable.java | 4 +- .../org/apache/impala/catalog/VirtualColumn.java | 9 +- .../org/apache/impala/planner/HdfsScanNode.java | 9 +- .../apache/impala/planner/IcebergDeleteSink.java | 160 +++++++++++ .../java/org/apache/impala/planner/Planner.java | 34 ++- .../java/org/apache/impala/planner/TableSink.java | 12 + .../apache/impala/service/CatalogOpExecutor.java | 2 +- .../java/org/apache/impala/service/Frontend.java | 60 ++++- .../impala/service/IcebergCatalogOpExecutor.java | 40 +++ .../impala/analysis/AnalyzeModifyStmtsTest.java | 9 +- .../org/apache/impala/planner/PlannerTest.java | 10 + .../queries/PlannerTest/iceberg-v2-delete.test | 195 ++++++++++++++ .../queries/QueryTest/iceberg-delete-complex.test | 106 ++++++++ .../queries/QueryTest/iceberg-delete.test | 117 ++++++++ .../queries/QueryTest/iceberg-negative.test | 52 ++++ tests/query_test/test_iceberg.py | 59 ++++- 44 files changed, 1850 insertions(+), 457 deletions(-) diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 52fd3b95b..776f50fee 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -76,6 +76,7 @@ add_library(Exec hdfs-table-sink.cc hdfs-table-writer.cc hdfs-text-table-writer.cc + iceberg-delete-sink.cc incr-stats-util.cc join-builder.cc nested-loop-join-builder.cc @@ -93,6 +94,7 @@ add_library(Exec sort-node.cc streaming-aggregation-node.cc subplan-node.cc + table-sink-base.cc text-converter.cc topn-node.cc union-node.cc diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc index 790b7b214..d41bbac8e 100644 --- a/be/src/exec/data-sink.cc +++ b/be/src/exec/data-sink.cc @@ -24,6 +24,7 @@ #include "exec/exec-node.h" #include "exec/hbase/hbase-table-sink.h" #include "exec/hdfs-table-sink.h" +#include "exec/iceberg-delete-sink.h" #include "exec/kudu/kudu-table-sink.h" #include "exec/kudu/kudu-util.h" #include "exec/blocking-plan-root-sink.h" @@ -83,7 +84,12 @@ Status DataSinkConfig::CreateConfig(const TDataSink& thrift_sink, if (!thrift_sink.__isset.table_sink) return Status("Missing table sink."); switch (thrift_sink.table_sink.type) { case TTableSinkType::HDFS: - *data_sink = pool->Add(new HdfsTableSinkConfig()); + if (thrift_sink.table_sink.action == TSinkAction::INSERT) { + *data_sink = pool->Add(new HdfsTableSinkConfig()); + } else if (thrift_sink.table_sink.action == TSinkAction::DELETE) { + // Currently only Iceberg tables support DELETE operations for FS tables. + *data_sink = pool->Add(new IcebergDeleteSinkConfig()); + } break; case TTableSinkType::KUDU: RETURN_IF_ERROR(CheckKuduAvailability()); diff --git a/be/src/exec/file-metadata-utils.cc b/be/src/exec/file-metadata-utils.cc index af575c8f0..f886b50c1 100644 --- a/be/src/exec/file-metadata-utils.cc +++ b/be/src/exec/file-metadata-utils.cc @@ -89,7 +89,6 @@ void FileMetadataUtils::AddIcebergColumns(MemPool* mem_pool, Tuple** template_tu *template_tuple = Tuple::Create(tuple_desc->byte_size(), mem_pool); } for (const SlotDescriptor* slot_desc : scan_node_->tuple_desc()->slots()) { - // The partition column of Iceberg tables must not be virtual column if (slot_desc->IsVirtual()) continue; const SchemaPath& path = slot_desc->col_path(); if (path.size() != 1) continue; diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 355cb7479..2534f711b 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -78,10 +78,8 @@ void HdfsTableSinkConfig::Close() { HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sink_config, const THdfsTableSink& hdfs_sink, RuntimeState* state) - : DataSink(sink_id, sink_config, "HdfsTableSink", state), - table_desc_(nullptr), + : TableSinkBase(sink_id, sink_config, "HdfsTableSink", state), prototype_partition_(nullptr), - table_id_(sink_config.tsink_->table_sink.target_table_id), skip_header_line_count_( hdfs_sink.__isset.skip_header_line_count ? hdfs_sink.skip_header_line_count : 0), overwrite_(hdfs_sink.overwrite), @@ -111,9 +109,9 @@ HdfsTableSink::HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sin } Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { - RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker)); - unique_id_str_ = PrintId(state->fragment_instance_id(), "-"); SCOPED_TIMER(profile()->total_time_counter()); + RETURN_IF_ERROR(TableSinkBase::Prepare(state, parent_mem_tracker)); + unique_id_str_ = PrintId(state->fragment_instance_id(), "-"); RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_key_exprs_, state, state->obj_pool(), expr_perm_pool_.get(), expr_results_pool_.get(), &partition_key_expr_evals_)); @@ -148,14 +146,6 @@ Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke DCHECK_GE(output_expr_evals_.size(), table_desc_->num_cols() - table_desc_->num_clustering_cols()) << DebugString(); - partitions_created_counter_ = ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT); - files_created_counter_ = ADD_COUNTER(profile(), "FilesCreated", TUnit::UNIT); - rows_inserted_counter_ = ADD_COUNTER(profile(), "RowsInserted", TUnit::UNIT); - bytes_written_counter_ = ADD_COUNTER(profile(), "BytesWritten", TUnit::BYTES); - encode_timer_ = ADD_TIMER(profile(), "EncodeTimer"); - hdfs_write_timer_ = ADD_TIMER(profile(), "HdfsWriteTimer"); - compress_timer_ = ADD_TIMER(profile(), "CompressTimer"); - return Status::OK(); } @@ -220,7 +210,8 @@ void HdfsTableSink::BuildPartitionDescMap() { } Status HdfsTableSink::Open(RuntimeState* state) { - RETURN_IF_ERROR(DataSink::Open(state)); + SCOPED_TIMER(profile()->total_time_counter()); + RETURN_IF_ERROR(TableSinkBase::Open(state)); DCHECK_EQ(partition_key_exprs_.size(), partition_key_expr_evals_.size()); RETURN_IF_ERROR(ScalarExprEvaluator::Open(partition_key_expr_evals_, state)); if (!IsIceberg()) BuildPartitionDescMap(); @@ -228,115 +219,6 @@ Status HdfsTableSink::Open(RuntimeState* state) { return Status::OK(); } -void HdfsTableSink::BuildHdfsFileNames( - const HdfsPartitionDescriptor& partition_descriptor, - OutputPartition* output_partition, const string &external_partition_name) { - - // Create final_hdfs_file_name_prefix and tmp_hdfs_file_name_prefix. - // Path: <hdfs_base_dir>/<partition_values>/<unique_id_str> - // Or, for transactional tables: - // Path: <hdfs_base_dir>/<partition_values>/<transaction_directory>/<unique_id_str> - // Where <transaction_directory> is either a 'base' or a 'delta' directory in Hive ACID - // terminology. - - // Temporary files are written under the following path which is unique to this sink: - // <table_dir>/_impala_insert_staging/<query_id>/<per_fragment_unique_id>_dir/ - // Both the temporary directory and the file name, when moved to the real partition - // directory must be unique. - // Prefix the directory name with "." to make it hidden and append "_dir" at the end - // of the directory to avoid name clashes for unpartitioned tables. - // The files are located in <partition_values>/<random_value>_data under - // tmp_hdfs_file_name_prefix. - - // Use the query id as filename. - const string& query_suffix = Substitute("$0_$1_data", unique_id_str_, rand()); - - output_partition->tmp_hdfs_dir_name = - Substitute("$0/.$1_$2_dir/", staging_dir_, unique_id_str_, rand()); - output_partition->tmp_hdfs_file_name_prefix = Substitute("$0$1/$2", - output_partition->tmp_hdfs_dir_name, output_partition->partition_name, - query_suffix); - - if (HasExternalOutputDir()) { - // When an external FE has provided a staging directory we use that directly. - // We are trusting that the external frontend implementation has done appropriate - // authorization checks on the external output directory. - output_partition->final_hdfs_file_name_prefix = Substitute("$0/$1/", - external_output_dir_, external_partition_name); - } else if (partition_descriptor.location().empty()) { - output_partition->final_hdfs_file_name_prefix = Substitute("$0/$1/", - table_desc_->hdfs_base_dir(), output_partition->partition_name); - } else { - // If the partition descriptor has a location (as set by alter table add partition - // with a location clause), that provides the complete directory path for this - // partition. No partition key suffix ("p=1/j=foo/") should be added. - output_partition->final_hdfs_file_name_prefix = - Substitute("$0/", partition_descriptor.location()); - } - if (IsHiveAcid()) { - if (HasExternalOutputDir()) { - // The 0 padding on base and delta is to match the behavior of Hive since various - // systems will expect a certain format for dynamic partition creation. Additionally - // include an 0 statement id for delta directory so various Hive AcidUtils detect - // the directory (such as AcidUtils.baseOrDeltaSubdir()). Multiple statements in a - // single transaction is not supported. - if (overwrite_) { - output_partition->final_hdfs_file_name_prefix += StringPrintf("/base_%07ld/", - write_id_); - } else { - output_partition->final_hdfs_file_name_prefix += StringPrintf( - "/delta_%07ld_%07ld_0000/", write_id_, write_id_); - } - } else { - string acid_dir = Substitute(overwrite_ ? "/base_$0/" : "/delta_$0_$0/", write_id_); - output_partition->final_hdfs_file_name_prefix += acid_dir; - } - } - if (IsIceberg()) { - //TODO: implement LocationProviders. - if (output_partition->partition_name.empty()) { - output_partition->final_hdfs_file_name_prefix = - Substitute("$0/data/", table_desc_->IcebergTableLocation()); - } else { - output_partition->final_hdfs_file_name_prefix = - Substitute("$0/data/$1/", table_desc_->IcebergTableLocation(), - output_partition->partition_name); - } - } - output_partition->final_hdfs_file_name_prefix += query_suffix; - - output_partition->num_files = 0; -} - -Status HdfsTableSink::WriteRowsToPartition( - RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair) { - // The rows of this batch may span multiple files. We repeatedly pass the row batch to - // the writer until it sets new_file to false, indicating that all rows have been - // written. The writer tracks where it is in the batch when it returns with new_file - // set. - bool new_file; - while (true) { - OutputPartition* output_partition = partition_pair->first.get(); - Status status = - output_partition->writer->AppendRows(batch, partition_pair->second, &new_file); - if (!status.ok()) { - // IMPALA-10607: Deletes partition file if staging is skipped when appending rows - // fails. Otherwise, it leaves the file in un-finalized state. - if (ShouldSkipStaging(state, output_partition)) { - status.MergeStatus(ClosePartitionFile(state, output_partition)); - hdfsDelete(output_partition->hdfs_connection, - output_partition->current_file_name.c_str(), 0); - } - return status; - } - if (!new_file) break; - RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition)); - RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition)); - } - partition_pair->second.clear(); - return Status::OK(); -} - Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) { DCHECK_GT(batch->num_rows(), 0); DCHECK(!dynamic_partition_key_expr_evals_.empty()); @@ -400,104 +282,6 @@ Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batc return Status::OK(); } -Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, - OutputPartition* output_partition) { - SCOPED_TIMER(ADD_TIMER(profile(), "TmpFileCreateTimer")); - string file_name_pattern = - output_partition->writer->file_extension().empty() ? "$0.$1" : "$0.$1.$2"; - string final_location = Substitute(file_name_pattern, - output_partition->final_hdfs_file_name_prefix, output_partition->num_files, - output_partition->writer->file_extension()); - - // If ShouldSkipStaging() is true, then the table sink will write the file(s) for this - // partition to the final location directly. If it is false, the file(s) will be written - // to a temporary staging location which will be moved by the coordinator to the final - // location. - if (ShouldSkipStaging(state, output_partition)) { - output_partition->current_file_name = final_location; - output_partition->current_file_final_name = ""; - } else { - output_partition->current_file_name = Substitute(file_name_pattern, - output_partition->tmp_hdfs_file_name_prefix, output_partition->num_files, - output_partition->writer->file_extension()); - // Save the ultimate destination for this file (it will be moved by the coordinator). - output_partition->current_file_final_name = final_location; - } - // Check if tmp_hdfs_file_name exists. - const char* tmp_hdfs_file_name_cstr = - output_partition->current_file_name.c_str(); - - if (hdfsExists(output_partition->hdfs_connection, tmp_hdfs_file_name_cstr) == 0) { - return Status(GetHdfsErrorMsg("Temporary HDFS file already exists: ", - output_partition->current_file_name)); - } - - // This is the block size from the HDFS partition metadata. - uint64_t block_size = output_partition->partition_descriptor->block_size(); - // hdfsOpenFile takes a 4 byte integer as the block size. - if (block_size > numeric_limits<int32_t>::max()) { - return Status(Substitute("HDFS block size must be smaller than 2GB but is configured " - "in the HDFS partition to $0.", block_size)); - } - - if (block_size == 0) block_size = output_partition->writer->default_block_size(); - if (block_size > numeric_limits<int32_t>::max()) { - return Status(Substitute("HDFS block size must be smaller than 2GB but the target " - "table requires $0.", block_size)); - } - - DCHECK_LE(block_size, numeric_limits<int32_t>::max()); - output_partition->tmp_hdfs_file = hdfsOpenFile(output_partition->hdfs_connection, - tmp_hdfs_file_name_cstr, O_WRONLY, 0, 0, block_size); - - VLOG_FILE << "hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr; - if (output_partition->tmp_hdfs_file == nullptr) { - return Status(GetHdfsErrorMsg("Failed to open HDFS file for writing: ", - output_partition->current_file_name)); - } - - if (IsS3APath(tmp_hdfs_file_name_cstr) || - IsABFSPath(tmp_hdfs_file_name_cstr) || - IsADLSPath(tmp_hdfs_file_name_cstr) || - IsOSSPath(tmp_hdfs_file_name_cstr) || - IsGcsPath(tmp_hdfs_file_name_cstr) || - IsCosPath(tmp_hdfs_file_name_cstr) || - IsSFSPath(tmp_hdfs_file_name_cstr) || - IsOzonePath(tmp_hdfs_file_name_cstr)) { - // On S3A, the file cannot be stat'ed until after it's closed, and even so, the block - // size reported will be just the filesystem default. Similarly, the block size - // reported for ADLS will be the filesystem default. So, remember the requested block - // size. - // TODO: IMPALA-9437: Ozone does not support stat'ing a file until after it's closed, - // so for now skip the call to hdfsGetPathInfo. - output_partition->block_size = block_size; - } else { - // HDFS may choose to override the block size that we've recommended, so for non-S3 - // files, we get the block size by stat-ing the file. - hdfsFileInfo* info = hdfsGetPathInfo(output_partition->hdfs_connection, - output_partition->current_file_name.c_str()); - if (info == nullptr) { - return Status(GetHdfsErrorMsg("Failed to get info on temporary HDFS file: ", - output_partition->current_file_name)); - } - output_partition->block_size = info->mBlockSize; - hdfsFreeFileInfo(info, 1); - } - - ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(1); - COUNTER_ADD(files_created_counter_, 1); - - ++output_partition->num_files; - output_partition->current_file_rows = 0; - Status status = output_partition->writer->InitNewFile(); - if (!status.ok()) { - status.MergeStatus(ClosePartitionFile(state, output_partition)); - hdfsDelete(output_partition->hdfs_connection, - output_partition->current_file_name.c_str(), 0); - } - return status; -} - string HdfsTableSink::GetPartitionName(int i) { if (IsIceberg()) { DCHECK_LT(i, partition_key_expr_evals_.size()); @@ -741,40 +525,6 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) { return Status::OK(); } -Status HdfsTableSink::FinalizePartitionFile( - RuntimeState* state, OutputPartition* partition) { - if (partition->tmp_hdfs_file == nullptr && !overwrite_) return Status::OK(); - SCOPED_TIMER(ADD_TIMER(profile(), "FinalizePartitionFileTimer")); - - // OutputPartition writer could be nullptr if there is no row to output. - if (partition->writer.get() != nullptr) { - RETURN_IF_ERROR(partition->writer->Finalize()); - state->dml_exec_state()->UpdatePartition( - partition->partition_name, partition->current_file_rows, - &partition->writer->stats()); - state->dml_exec_state()->AddCreatedFile(*partition, IsIceberg(), - partition->writer->iceberg_file_stats()); - } - - RETURN_IF_ERROR(ClosePartitionFile(state, partition)); - return Status::OK(); -} - -Status HdfsTableSink::ClosePartitionFile( - RuntimeState* state, OutputPartition* partition) { - if (partition->tmp_hdfs_file == nullptr) return Status::OK(); - int hdfs_ret = hdfsCloseFile(partition->hdfs_connection, partition->tmp_hdfs_file); - VLOG_FILE << "hdfsCloseFile() file=" << partition->current_file_name; - partition->tmp_hdfs_file = nullptr; - ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(-1); - if (hdfs_ret != 0) { - return Status(ErrorMsg(TErrorCode::GENERAL, - GetHdfsErrorMsg("Failed to close HDFS file: ", - partition->current_file_name))); - } - return Status::OK(); -} - Status HdfsTableSink::FlushFinal(RuntimeState* state) { DCHECK(!closed_); SCOPED_TIMER(profile()->total_time_counter()); @@ -812,17 +562,10 @@ void HdfsTableSink::Close(RuntimeState* state) { } partition_keys_to_output_partitions_.clear(); ScalarExprEvaluator::Close(partition_key_expr_evals_, state); - DataSink::Close(state); + TableSinkBase::Close(state); closed_ = true; } -bool HdfsTableSink::ShouldSkipStaging(RuntimeState* state, OutputPartition* partition) { - if (IsTransactional() || HasExternalOutputDir() || is_result_sink_) return true; - // We skip staging if we are writing query results - return (IsS3APath(partition->final_hdfs_file_name_prefix.c_str()) && !overwrite_ && - state->query_options().s3_skip_insert_staging); -} - string HdfsTableSink::DebugString() const { stringstream out; out << "HdfsTableSink(overwrite=" << (overwrite_ ? "true" : "false") diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index 5aa637949..95cdc0010 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -22,8 +22,8 @@ #include <hdfs.h> #include <boost/unordered_map.hpp> -#include "exec/data-sink.h" #include "exec/output-partition.h" +#include "exec/table-sink-base.h" #include "runtime/descriptors.h" namespace impala { @@ -87,45 +87,44 @@ class HdfsTableSinkConfig : public DataSinkConfig { /// a commit for the ACID transaction. /// The name of the output directory will be /// <table base dir>/<partition dirs>/<ACID base or delta directory> -class HdfsTableSink : public DataSink { +class HdfsTableSink : public TableSinkBase { public: HdfsTableSink(TDataSinkId sink_id, const HdfsTableSinkConfig& sink_config, const THdfsTableSink& hdfs_sink, RuntimeState* state); /// Prepares output_exprs and partition_key_exprs, and connects to HDFS. - virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker); + Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override; /// Opens output_exprs and partition_key_exprs, prepares the single output partition for /// static inserts, and populates partition_descriptor_map_. - virtual Status Open(RuntimeState* state); + Status Open(RuntimeState* state) override; /// Append all rows in batch to the temporary Hdfs files corresponding to partitions. - virtual Status Send(RuntimeState* state, RowBatch* batch); + Status Send(RuntimeState* state, RowBatch* batch) override; /// Finalize any open files. /// TODO: IMPALA-2988: Move calls to functions that can fail in Close() to FlushFinal() - virtual Status FlushFinal(RuntimeState* state); + Status FlushFinal(RuntimeState* state) override; /// Closes writers, output_exprs and partition_key_exprs and releases resources. /// The temporary files will be moved to their final destination by the Coordinator. - virtual void Close(RuntimeState* state); + void Close(RuntimeState* state) override; - int skip_header_line_count() const { return skip_header_line_count_; } - const vector<int32_t>& sort_columns() const { return sort_columns_; } - TSortingOrder::type sorting_order() const { return sorting_order_; } + const vector<int32_t>& sort_columns() const override { return sort_columns_; } + TSortingOrder::type sorting_order() const override { return sorting_order_; } const HdfsTableDescriptor& TableDesc() { return *table_desc_; } - const std::map<string, int64_t>& GetParquetBloomFilterColumns() const { + const std::map<string, int64_t>& GetParquetBloomFilterColumns() const override { return parquet_bloom_filter_columns_; } - RuntimeProfile::Counter* rows_inserted_counter() { return rows_inserted_counter_; } - RuntimeProfile::Counter* bytes_written_counter() { return bytes_written_counter_; } - RuntimeProfile::Counter* encode_timer() { return encode_timer_; } - RuntimeProfile::Counter* hdfs_write_timer() { return hdfs_write_timer_; } - RuntimeProfile::Counter* compress_timer() { return compress_timer_; } + bool is_overwrite() const override { return overwrite_; } + bool is_result_sink() const override { return is_result_sink_; } + std::string staging_dir() const override { return staging_dir_; } + int skip_header_line_count() const override { return skip_header_line_count_; } + int64_t write_id() const override { return write_id_; } - std::string DebugString() const; + std::string DebugString() const override; private: /// Build a map from partition key values to partition descriptor for multiple output @@ -154,25 +153,6 @@ class HdfsTableSink : public DataSink { std::vector<std::string>* raw_partition_names, string* external_partition_name); - /// Add a temporary file to an output partition. Files are created in a - /// temporary directory and then moved to the real partition directory by the - /// coordinator in a finalization step. The temporary file's current location - /// and final destination are recorded in the state parameter. - /// If this function fails, the tmp file is cleaned up. - Status CreateNewTmpFile(RuntimeState* state, OutputPartition* output_partition) - WARN_UNUSED_RESULT; - - /// Key is the concatenation of the evaluated dynamic_partition_key_exprs_ generated by - /// GetHashTblKey(). Maps to an OutputPartition and a vector of indices of the rows in - /// the current batch to insert into the partition. The PartitionPair owns the - /// OutputPartition via a unique_ptr so that the memory is freed as soon as the - /// PartitionPair is removed from the map. This is important, because the - /// PartitionPairs can have different lifetimes. For example, a clustered insert into a - /// partitioned table iterates over the partitions, so only one PartitionPairs is - /// in the map at any given time. - typedef std::pair<std::unique_ptr<OutputPartition>, std::vector<int32_t>> PartitionPair; - typedef boost::unordered_map<std::string, PartitionPair> PartitionMap; - /// Generates string key for hash_tbl_ as a concatenation of all evaluated exprs, /// evaluated against 'row'. The generated string is much shorter than the full Hdfs /// file name. @@ -189,65 +169,20 @@ class HdfsTableSink : public DataSink { const std::string& key, PartitionPair** partition_pair, bool no_more_rows) WARN_UNUSED_RESULT; - /// Sets hdfs_file_name and tmp_hdfs_file_name of given output partition. - /// The Hdfs directory is created from the target table's base Hdfs dir, - /// the partition_key_names_ and the evaluated partition_key_exprs_. - /// The Hdfs file name is the unique_id_str_. - void BuildHdfsFileNames(const HdfsPartitionDescriptor& partition_descriptor, - OutputPartition* output, const std::string &external_partition_path); - - /// Writes all rows referenced by the row index vector in 'partition_pair' to the - /// partition's writer and clears the row index vector afterwards. - Status WriteRowsToPartition( - RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair) - WARN_UNUSED_RESULT; - /// Maps all rows in 'batch' to partitions and appends them to their temporary Hdfs /// files. The input must be ordered by the partition key expressions. Status WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) WARN_UNUSED_RESULT; - /// Updates runtime stats of HDFS with rows written, then closes the file associated - /// with the partition by calling ClosePartitionFile() - Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition) - WARN_UNUSED_RESULT; - - /// Closes the hdfs file for this partition as well as the writer. - Status ClosePartitionFile(RuntimeState* state, OutputPartition* partition) - WARN_UNUSED_RESULT; - /// Returns the ith partition name of the table. std::string GetPartitionName(int i); - // Returns TRUE if the staging step should be skipped for this partition. This allows - // for faster INSERT query completion time for the S3A filesystem as the coordinator - // does not have to copy the file(s) from the staging locaiton to the final location. We - // do not skip for INSERT OVERWRITEs because the coordinator will delete all files in - // the final location before moving the staged files there, so we cannot write directly - // to the final location and need to write to the temporary staging location. - bool ShouldSkipStaging(RuntimeState* state, OutputPartition* partition); - - /// Returns TRUE if the target table is transactional. - bool IsTransactional() const { return IsHiveAcid() || IsIceberg(); } - /// Returns TRUE for Hive ACID tables. - bool IsHiveAcid() const { return write_id_ != -1; } - - /// Returns TRUE for Iceberg tables. - bool IsIceberg() const { return table_desc_->IsIcebergTable(); } - - /// Returns TRUE if an external output directory was provided. - bool HasExternalOutputDir() { return !external_output_dir_.empty(); } - - /// Descriptor of target table. Set in Prepare(). - const HdfsTableDescriptor* table_desc_; + bool IsHiveAcid() const override { return write_id_ != -1; } /// The partition descriptor used when creating new partitions from this sink. /// Currently we don't support multi-format sinks. const HdfsPartitionDescriptor* prototype_partition_; - /// Table id resolved in Prepare() to set tuple_desc_; - TableId table_id_; - /// The 'skip.header.line.count' property of the target Hdfs table. We will insert this /// many empty lines at the beginning of new text files, which will be skipped by the /// scanners while reading from the files. @@ -284,9 +219,6 @@ class HdfsTableSink : public DataSink { /// <hdfs_table_base_dir>/_impala_insert_staging/ during Prepare() std::string staging_dir_; - /// The directory in which an external FE expects results to be written to. - std::string external_output_dir_; - /// How deep into the partition specification in which to start creating partition // directories. Used in conjunction with external_output_dir_ to inform the table // sink which directories are pre-created. @@ -296,10 +228,6 @@ class HdfsTableSink : public DataSink { /// Parquet Bloom filtering is not enabled are not listed. std::map<std::string, int64_t> parquet_bloom_filter_columns_; - /// string representation of the unique fragment instance id. Used for per-partition - /// Hdfs file names, and for tmp Hdfs directories. Set in Prepare(); - std::string unique_id_str_; - /// Hash table of generated output partitions. /// Maps from a string representation of the dynamic_partition_key_exprs_ /// generated by GetHashTblKey() to its corresponding OutputPartition. @@ -322,18 +250,6 @@ class HdfsTableSink : public DataSink { typedef boost::unordered_map<std::string, HdfsPartitionDescriptor*> PartitionDescriptorMap; PartitionDescriptorMap partition_descriptor_map_; - - RuntimeProfile::Counter* partitions_created_counter_; - RuntimeProfile::Counter* files_created_counter_; - RuntimeProfile::Counter* rows_inserted_counter_; - RuntimeProfile::Counter* bytes_written_counter_; - - /// Time spent converting tuple to on disk format. - RuntimeProfile::Counter* encode_timer_; - /// Time spent writing to hdfs - RuntimeProfile::Counter* hdfs_write_timer_; - /// Time spent compressing data - RuntimeProfile::Counter* compress_timer_; /// Will the output of this sink be used for query results const bool is_result_sink_; }; diff --git a/be/src/exec/hdfs-table-writer.cc b/be/src/exec/hdfs-table-writer.cc index af4968420..c710ac6f0 100644 --- a/be/src/exec/hdfs-table-writer.cc +++ b/be/src/exec/hdfs-table-writer.cc @@ -26,7 +26,7 @@ namespace impala { -HdfsTableWriter::HdfsTableWriter(HdfsTableSink* parent, +HdfsTableWriter::HdfsTableWriter(TableSinkBase* parent, RuntimeState* state, OutputPartition* output, const HdfsPartitionDescriptor* partition_desc, const HdfsTableDescriptor* table_desc) : parent_(parent), diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h index 764f30e72..ae4d56589 100644 --- a/be/src/exec/hdfs-table-writer.h +++ b/be/src/exec/hdfs-table-writer.h @@ -29,7 +29,7 @@ namespace impala { class HdfsPartitionDescriptor; class HdfsTableDescriptor; -class HdfsTableSink; +class TableSinkBase; struct OutputPartition; class RowBatch; class RuntimeState; @@ -61,7 +61,7 @@ class HdfsTableWriter { /// output_partition -- Information on the output partition file. /// partition -- the descriptor for the partition being written /// table_desc -- the descriptor for the table being written. - HdfsTableWriter(HdfsTableSink* parent, + HdfsTableWriter(TableSinkBase* parent, RuntimeState* state, OutputPartition* output_partition, const HdfsPartitionDescriptor* partition_desc, const HdfsTableDescriptor* table_desc); @@ -132,7 +132,7 @@ class HdfsTableWriter { } /// Parent table sink object - HdfsTableSink* parent_; + TableSinkBase* parent_; /// Runtime state. RuntimeState* state_; diff --git a/be/src/exec/iceberg-delete-sink.cc b/be/src/exec/iceberg-delete-sink.cc new file mode 100644 index 000000000..ccbef01f3 --- /dev/null +++ b/be/src/exec/iceberg-delete-sink.cc @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/iceberg-delete-sink.h" + +#include "common/object-pool.h" +#include "exec/parquet/hdfs-parquet-table-writer.h" +#include "exprs/scalar-expr.h" +#include "runtime/descriptors.h" +#include "runtime/hdfs-fs-cache.h" +#include "runtime/mem-tracker.h" +#include "runtime/row-batch.h" +#include "runtime/runtime-state.h" +#include "util/debug-util.h" +#include "util/hdfs-util.h" +#include "util/impalad-metrics.h" +#include "util/metrics.h" +#include "util/runtime-profile-counters.h" + +#include "common/names.h" + +namespace impala { + +IcebergDeleteSink::IcebergDeleteSink(TDataSinkId sink_id, + const IcebergDeleteSinkConfig& sink_config, const TIcebergDeleteSink& ice_del_sink, + RuntimeState* state) : TableSinkBase(sink_id, sink_config, + "IcebergDeleteSink", state) { +} + +DataSink* IcebergDeleteSinkConfig::CreateSink(RuntimeState* state) const { + TDataSinkId sink_id = state->fragment().idx; + return state->obj_pool()->Add( + new IcebergDeleteSink(sink_id, *this, + this->tsink_->table_sink.iceberg_delete_sink, state)); +} + +void IcebergDeleteSinkConfig::Close() { + DataSinkConfig::Close(); +} + +Status IcebergDeleteSinkConfig::Init( + const TDataSink& tsink, const RowDescriptor* input_row_desc, FragmentState* state) { + RETURN_IF_ERROR(DataSinkConfig::Init(tsink, input_row_desc, state)); + DCHECK(tsink_->__isset.table_sink); + return Status::OK(); +} + +Status IcebergDeleteSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { + SCOPED_TIMER(profile()->total_time_counter()); + RETURN_IF_ERROR(TableSinkBase::Prepare(state, parent_mem_tracker)); + unique_id_str_ = "delete-" + PrintId(state->fragment_instance_id(), "-"); + + // Resolve table id and set input tuple descriptor. + table_desc_ = static_cast<const HdfsTableDescriptor*>( + state->desc_tbl().GetTableDescriptor(table_id_)); + if (table_desc_ == nullptr) { + stringstream error_msg("Failed to get table descriptor for table id: "); + error_msg << table_id_; + return Status(error_msg.str()); + } + + DCHECK_GE(output_expr_evals_.size(), + table_desc_->num_cols() - table_desc_->num_clustering_cols()) << DebugString(); + + return Status::OK(); +} + +Status IcebergDeleteSink::Open(RuntimeState* state) { + SCOPED_TIMER(profile()->total_time_counter()); + RETURN_IF_ERROR(TableSinkBase::Open(state)); + prototype_partition_ = CHECK_NOTNULL(table_desc_->prototype_partition_descriptor()); + output_partition_ = make_pair(make_unique<OutputPartition>(), std::vector<int32_t>()); + state->dml_exec_state()->AddPartition( + output_partition_.first->partition_name, prototype_partition_->id(), + &table_desc_->hdfs_base_dir(), nullptr); + return Status::OK(); +} + +Status IcebergDeleteSink::Send(RuntimeState* state, RowBatch* batch) { + SCOPED_TIMER(profile()->total_time_counter()); + RETURN_IF_ERROR(state->CheckQueryState()); + // We don't do any work for an empty batch. + if (batch->num_rows() == 0) return Status::OK(); + + if (output_partition_.first->writer == nullptr) { + RETURN_IF_ERROR(InitOutputPartition(state)); + } + + RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &output_partition_)); + + return Status(); +} + +Status IcebergDeleteSink::InitOutputPartition(RuntimeState* state) { + // Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/" + // etc. + stringstream partition_name_ss; + + // partition_name_ss now holds the unique descriptor for this partition, + output_partition_.first->partition_name = partition_name_ss.str(); + BuildHdfsFileNames(*prototype_partition_, output_partition_.first.get(), ""); + + // We will be writing to the final file if we're skipping staging, so get a connection + // to its filesystem. + RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( + output_partition_.first->final_hdfs_file_name_prefix, + &output_partition_.first->hdfs_connection)); + + output_partition_.first->partition_descriptor = prototype_partition_; + + output_partition_.first->writer.reset( + new HdfsParquetTableWriter( + this, state, output_partition_.first.get(), prototype_partition_, table_desc_)); + RETURN_IF_ERROR(output_partition_.first->writer->Init()); + COUNTER_ADD(partitions_created_counter_, 1); + return CreateNewTmpFile(state, output_partition_.first.get()); +} + +Status IcebergDeleteSink::FlushFinal(RuntimeState* state) { + DCHECK(!closed_); + SCOPED_TIMER(profile()->total_time_counter()); + + RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition_.first.get())); + return Status::OK(); +} + +void IcebergDeleteSink::Close(RuntimeState* state) { + if (closed_) return; + SCOPED_TIMER(profile()->total_time_counter()); + + if (output_partition_.first->writer != nullptr) { + output_partition_.first->writer->Close(); + } + Status close_status = ClosePartitionFile(state, output_partition_.first.get()); + if (!close_status.ok()) state->LogError(close_status.msg()); + + output_partition_.first.reset(); + TableSinkBase::Close(state); + closed_ = true; +} + +string IcebergDeleteSink::DebugString() const { + stringstream out; + out << "IcebergDeleteSink(" + << " table_desc=" << table_desc_->DebugString() + << " output_exprs=" << ScalarExpr::DebugString(output_exprs_) + << ")"; + return out.str(); +} + +} // namespace impala diff --git a/be/src/exec/iceberg-delete-sink.h b/be/src/exec/iceberg-delete-sink.h new file mode 100644 index 000000000..7e6b232aa --- /dev/null +++ b/be/src/exec/iceberg-delete-sink.h @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/output-partition.h" +#include "exec/table-sink-base.h" + +#include <unordered_map> + +namespace impala { + +class Expr; +class TupleDescriptor; +class TupleRow; +class RuntimeState; +class MemTracker; + +class IcebergDeleteSinkConfig : public DataSinkConfig { + public: + DataSink* CreateSink(RuntimeState* state) const override; + void Close() override; + + /// Expressions for computing the target partitions to which a row is written. + std::vector<ScalarExpr*> partition_key_exprs_; + + ~IcebergDeleteSinkConfig() override {} + + protected: + Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc, + FragmentState* state) override; +}; + +class IcebergDeleteSink : public TableSinkBase { + public: + IcebergDeleteSink(TDataSinkId sink_id, const IcebergDeleteSinkConfig& sink_config, + const TIcebergDeleteSink& hdfs_sink, RuntimeState* state); + + /// Prepares output_exprs and partition_key_exprs, and connects to HDFS. + Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override; + + /// Opens output_exprs and partition_key_exprs, prepares the single output partition for + /// static inserts, and populates partition_descriptor_map_. + Status Open(RuntimeState* state) override; + + /// Append all rows in batch to the temporary Hdfs files corresponding to partitions. + Status Send(RuntimeState* state, RowBatch* batch) override; + + /// Finalize any open files. + /// TODO: IMPALA-2988: Move calls to functions that can fail in Close() to FlushFinal() + Status FlushFinal(RuntimeState* state) override; + + /// Closes writers, output_exprs and partition_key_exprs and releases resources. + /// The temporary files will be moved to their final destination by the Coordinator. + void Close(RuntimeState* state) override; + + TSortingOrder::type sorting_order() const override { return TSortingOrder::LEXICAL; } + + std::string DebugString() const override; + + private: + /// Initialises the filenames of a given output partition, and opens the temporary file. + Status InitOutputPartition(RuntimeState* state) WARN_UNUSED_RESULT; + + /// For now we only allow non-partitioned Iceberg tables. + PartitionPair output_partition_; + + /// The partition descriptor used when creating new partitions from this sink. + /// Currently we don't support multi-format sinks. + const HdfsPartitionDescriptor* prototype_partition_; +}; + +} + + diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.cc b/be/src/exec/parquet/hdfs-parquet-table-writer.cc index a5f604499..07a33df66 100644 --- a/be/src/exec/parquet/hdfs-parquet-table-writer.cc +++ b/be/src/exec/parquet/hdfs-parquet-table-writer.cc @@ -136,8 +136,9 @@ class HdfsParquetTableWriter::BaseColumnWriter { row_group_stats_base_(nullptr), table_sink_mem_tracker_(parent_->parent_->mem_tracker()), column_name_(std::move(column_name)) { - static_assert(std::is_same<decltype(parent_->parent_), HdfsTableSink*>::value, - "'table_sink_mem_tracker_' must point to the mem tracker of an HdfsTableSink"); + static_assert(std::is_base_of_v<TableSinkBase, + std::remove_reference_t<decltype(*parent_->parent_)>>, + "'table_sink_mem_tracker_' must point to the mem tracker of a TableSinkBase"); def_levels_ = parent_->state_->obj_pool()->Add( new RleEncoder(parent_->reusable_col_mem_pool_->Allocate(DEFAULT_DATA_PAGE_SIZE), DEFAULT_DATA_PAGE_SIZE, 1)); @@ -1231,7 +1232,8 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() { page_stats_base_->Reset(); } -HdfsParquetTableWriter::HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeState* state, +HdfsParquetTableWriter:: +HdfsParquetTableWriter(TableSinkBase* parent, RuntimeState* state, OutputPartition* output, const HdfsPartitionDescriptor* part_desc, const HdfsTableDescriptor* table_desc) : HdfsTableWriter(parent, state, output, part_desc, table_desc), diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.h b/be/src/exec/parquet/hdfs-parquet-table-writer.h index 862b9cbf0..c76dc2cef 100644 --- a/be/src/exec/parquet/hdfs-parquet-table-writer.h +++ b/be/src/exec/parquet/hdfs-parquet-table-writer.h @@ -19,7 +19,7 @@ #ifndef IMPALA_EXEC_HDFS_PARQUET_TABLE_WRITER_H #define IMPALA_EXEC_HDFS_PARQUET_TABLE_WRITER_H -#include "exec/data-sink.h" +#include "exec/table-sink-base.h" #include <hdfs.h> #include <map> @@ -53,7 +53,7 @@ class TupleRow; class HdfsParquetTableWriter : public HdfsTableWriter { public: - HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeState* state, + HdfsParquetTableWriter(TableSinkBase* parent, RuntimeState* state, OutputPartition* output_partition, const HdfsPartitionDescriptor* part_desc, const HdfsTableDescriptor* table_desc); diff --git a/be/src/exec/table-sink-base.cc b/be/src/exec/table-sink-base.cc new file mode 100644 index 000000000..09551d0b1 --- /dev/null +++ b/be/src/exec/table-sink-base.cc @@ -0,0 +1,293 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/table-sink-base.h" + +#include "exec/output-partition.h" +#include "runtime/runtime-state.h" +#include "util/hdfs-util.h" +#include "util/impalad-metrics.h" +#include "util/metrics.h" +#include "util/string-util.h" +#include "util/runtime-profile-counters.h" + +#include "common/names.h" + +namespace impala { + +Status TableSinkBase::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { + RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker)); + partitions_created_counter_ = ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT); + files_created_counter_ = ADD_COUNTER(profile(), "FilesCreated", TUnit::UNIT); + rows_inserted_counter_ = ADD_COUNTER(profile(), "RowsInserted", TUnit::UNIT); + bytes_written_counter_ = ADD_COUNTER(profile(), "BytesWritten", TUnit::BYTES); + encode_timer_ = ADD_TIMER(profile(), "EncodeTimer"); + hdfs_write_timer_ = ADD_TIMER(profile(), "HdfsWriteTimer"); + compress_timer_ = ADD_TIMER(profile(), "CompressTimer"); + return Status::OK(); +} + +Status TableSinkBase::ClosePartitionFile( + RuntimeState* state, OutputPartition* partition) { + if (partition->tmp_hdfs_file == nullptr) return Status::OK(); + int hdfs_ret = hdfsCloseFile(partition->hdfs_connection, partition->tmp_hdfs_file); + VLOG_FILE << "hdfsCloseFile() file=" << partition->current_file_name; + partition->tmp_hdfs_file = nullptr; + ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(-1); + if (hdfs_ret != 0) { + return Status(ErrorMsg(TErrorCode::GENERAL, + GetHdfsErrorMsg("Failed to close HDFS file: ", + partition->current_file_name))); + } + return Status::OK(); +} + +void TableSinkBase::BuildHdfsFileNames( + const HdfsPartitionDescriptor& partition_descriptor, + OutputPartition* output_partition, const string &external_partition_name) { + + // Create final_hdfs_file_name_prefix and tmp_hdfs_file_name_prefix. + // Path: <hdfs_base_dir>/<partition_values>/<unique_id_str> + // Or, for transactional tables: + // Path: <hdfs_base_dir>/<partition_values>/<transaction_directory>/<unique_id_str> + // Where <transaction_directory> is either a 'base' or a 'delta' directory in Hive ACID + // terminology. + + // Temporary files are written under the following path which is unique to this sink: + // <table_dir>/_impala_insert_staging/<query_id>/<per_fragment_unique_id>_dir/ + // Both the temporary directory and the file name, when moved to the real partition + // directory must be unique. + // Prefix the directory name with "." to make it hidden and append "_dir" at the end + // of the directory to avoid name clashes for unpartitioned tables. + // The files are located in <partition_values>/<random_value>_data under + // tmp_hdfs_file_name_prefix. + + // Use the query id as filename. + const string& query_suffix = Substitute("$0_$1_data", unique_id_str_, rand()); + + output_partition->tmp_hdfs_dir_name = + Substitute("$0/.$1_$2_dir/", staging_dir(), unique_id_str_, rand()); + output_partition->tmp_hdfs_file_name_prefix = Substitute("$0$1/$2", + output_partition->tmp_hdfs_dir_name, output_partition->partition_name, + query_suffix); + + if (HasExternalOutputDir()) { + // When an external FE has provided a staging directory we use that directly. + // We are trusting that the external frontend implementation has done appropriate + // authorization checks on the external output directory. + output_partition->final_hdfs_file_name_prefix = Substitute("$0/$1/", + external_output_dir_, external_partition_name); + } else if (partition_descriptor.location().empty()) { + output_partition->final_hdfs_file_name_prefix = Substitute("$0/$1/", + table_desc_->hdfs_base_dir(), output_partition->partition_name); + } else { + // If the partition descriptor has a location (as set by alter table add partition + // with a location clause), that provides the complete directory path for this + // partition. No partition key suffix ("p=1/j=foo/") should be added. + output_partition->final_hdfs_file_name_prefix = + Substitute("$0/", partition_descriptor.location()); + } + if (IsHiveAcid()) { + if (HasExternalOutputDir()) { + // The 0 padding on base and delta is to match the behavior of Hive since various + // systems will expect a certain format for dynamic partition creation. Additionally + // include an 0 statement id for delta directory so various Hive AcidUtils detect + // the directory (such as AcidUtils.baseOrDeltaSubdir()). Multiple statements in a + // single transaction is not supported. + if (is_overwrite()) { + output_partition->final_hdfs_file_name_prefix += StringPrintf("/base_%07ld/", + write_id()); + } else { + output_partition->final_hdfs_file_name_prefix += StringPrintf( + "/delta_%07ld_%07ld_0000/", write_id(), write_id()); + } + } else { + string acid_dir = Substitute( + is_overwrite() ? "/base_$0/" : "/delta_$0_$0/", write_id()); + output_partition->final_hdfs_file_name_prefix += acid_dir; + } + } + if (IsIceberg()) { + //TODO: implement LocationProviders. + if (output_partition->partition_name.empty()) { + output_partition->final_hdfs_file_name_prefix = + Substitute("$0/data/", table_desc_->IcebergTableLocation()); + } else { + output_partition->final_hdfs_file_name_prefix = + Substitute("$0/data/$1/", table_desc_->IcebergTableLocation(), + output_partition->partition_name); + } + } + output_partition->final_hdfs_file_name_prefix += query_suffix; + output_partition->num_files = 0; +} + +Status TableSinkBase::CreateNewTmpFile(RuntimeState* state, + OutputPartition* output_partition) { + SCOPED_TIMER(ADD_TIMER(profile(), "TmpFileCreateTimer")); + string file_name_pattern = + output_partition->writer->file_extension().empty() ? "$0.$1" : "$0.$1.$2"; + string final_location = Substitute(file_name_pattern, + output_partition->final_hdfs_file_name_prefix, output_partition->num_files, + output_partition->writer->file_extension()); + + // If ShouldSkipStaging() is true, then the table sink will write the file(s) for this + // partition to the final location directly. If it is false, the file(s) will be written + // to a temporary staging location which will be moved by the coordinator to the final + // location. + if (ShouldSkipStaging(state, output_partition)) { + output_partition->current_file_name = final_location; + output_partition->current_file_final_name = ""; + } else { + output_partition->current_file_name = Substitute(file_name_pattern, + output_partition->tmp_hdfs_file_name_prefix, output_partition->num_files, + output_partition->writer->file_extension()); + // Save the ultimate destination for this file (it will be moved by the coordinator). + output_partition->current_file_final_name = final_location; + } + // Check if tmp_hdfs_file_name exists. + const char* tmp_hdfs_file_name_cstr = + output_partition->current_file_name.c_str(); + + if (hdfsExists(output_partition->hdfs_connection, tmp_hdfs_file_name_cstr) == 0) { + return Status(GetHdfsErrorMsg("Temporary HDFS file already exists: ", + output_partition->current_file_name)); + } + + // This is the block size from the HDFS partition metadata. + uint64_t block_size = output_partition->partition_descriptor->block_size(); + // hdfsOpenFile takes a 4 byte integer as the block size. + if (block_size > numeric_limits<int32_t>::max()) { + return Status(Substitute("HDFS block size must be smaller than 2GB but is configured " + "in the HDFS partition to $0.", block_size)); + } + + if (block_size == 0) block_size = output_partition->writer->default_block_size(); + if (block_size > numeric_limits<int32_t>::max()) { + return Status(Substitute("HDFS block size must be smaller than 2GB but the target " + "table requires $0.", block_size)); + } + + DCHECK_LE(block_size, numeric_limits<int32_t>::max()); + output_partition->tmp_hdfs_file = hdfsOpenFile(output_partition->hdfs_connection, + tmp_hdfs_file_name_cstr, O_WRONLY, 0, 0, block_size); + + VLOG_FILE << "hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr; + if (output_partition->tmp_hdfs_file == nullptr) { + return Status(GetHdfsErrorMsg("Failed to open HDFS file for writing: ", + output_partition->current_file_name)); + } + + if (IsS3APath(tmp_hdfs_file_name_cstr) || + IsABFSPath(tmp_hdfs_file_name_cstr) || + IsADLSPath(tmp_hdfs_file_name_cstr) || + IsOSSPath(tmp_hdfs_file_name_cstr) || + IsGcsPath(tmp_hdfs_file_name_cstr) || + IsCosPath(tmp_hdfs_file_name_cstr) || + IsSFSPath(tmp_hdfs_file_name_cstr) || + IsOzonePath(tmp_hdfs_file_name_cstr)) { + // On S3A, the file cannot be stat'ed until after it's closed, and even so, the block + // size reported will be just the filesystem default. Similarly, the block size + // reported for ADLS will be the filesystem default. So, remember the requested block + // size. + // TODO: IMPALA-9437: Ozone does not support stat'ing a file until after it's closed, + // so for now skip the call to hdfsGetPathInfo. + output_partition->block_size = block_size; + } else { + // HDFS may choose to override the block size that we've recommended, so for non-S3 + // files, we get the block size by stat-ing the file. + hdfsFileInfo* info = hdfsGetPathInfo(output_partition->hdfs_connection, + output_partition->current_file_name.c_str()); + if (info == nullptr) { + return Status(GetHdfsErrorMsg("Failed to get info on temporary HDFS file: ", + output_partition->current_file_name)); + } + output_partition->block_size = info->mBlockSize; + hdfsFreeFileInfo(info, 1); + } + + ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(1); + COUNTER_ADD(files_created_counter_, 1); + + ++output_partition->num_files; + output_partition->current_file_rows = 0; + Status status = output_partition->writer->InitNewFile(); + if (!status.ok()) { + status.MergeStatus(ClosePartitionFile(state, output_partition)); + hdfsDelete(output_partition->hdfs_connection, + output_partition->current_file_name.c_str(), 0); + } + return status; +} + +Status TableSinkBase::WriteRowsToPartition( + RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair) { + // The rows of this batch may span multiple files. We repeatedly pass the row batch to + // the writer until it sets new_file to false, indicating that all rows have been + // written. The writer tracks where it is in the batch when it returns with new_file + // set. + bool new_file; + while (true) { + OutputPartition* output_partition = partition_pair->first.get(); + Status status = + output_partition->writer->AppendRows(batch, partition_pair->second, &new_file); + if (!status.ok()) { + // IMPALA-10607: Deletes partition file if staging is skipped when appending rows + // fails. Otherwise, it leaves the file in un-finalized state. + if (ShouldSkipStaging(state, output_partition)) { + status.MergeStatus(ClosePartitionFile(state, output_partition)); + hdfsDelete(output_partition->hdfs_connection, + output_partition->current_file_name.c_str(), 0); + } + return status; + } + if (!new_file) break; + RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition)); + RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition)); + } + partition_pair->second.clear(); + return Status::OK(); +} + +bool TableSinkBase::ShouldSkipStaging(RuntimeState* state, OutputPartition* partition) { + if (IsTransactional() || HasExternalOutputDir() || is_result_sink()) return true; + // We skip staging if we are writing query results + return (IsS3APath(partition->final_hdfs_file_name_prefix.c_str()) && !is_overwrite() && + state->query_options().s3_skip_insert_staging); +} + +Status TableSinkBase::FinalizePartitionFile( + RuntimeState* state, OutputPartition* partition) { + if (partition->tmp_hdfs_file == nullptr && !is_overwrite()) return Status::OK(); + SCOPED_TIMER(ADD_TIMER(profile(), "FinalizePartitionFileTimer")); + + // OutputPartition writer could be nullptr if there is no row to output. + if (partition->writer.get() != nullptr) { + RETURN_IF_ERROR(partition->writer->Finalize()); + state->dml_exec_state()->UpdatePartition( + partition->partition_name, partition->current_file_rows, + &partition->writer->stats()); + state->dml_exec_state()->AddCreatedFile(*partition, IsIceberg(), + partition->writer->iceberg_file_stats()); + } + + RETURN_IF_ERROR(ClosePartitionFile(state, partition)); + return Status::OK(); +} + +} + diff --git a/be/src/exec/table-sink-base.h b/be/src/exec/table-sink-base.h new file mode 100644 index 000000000..b82f74d35 --- /dev/null +++ b/be/src/exec/table-sink-base.h @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/data-sink.h" + +#include "runtime/descriptors.h" + +namespace impala { + +class TableSinkBase : public DataSink { +public: + TableSinkBase(TDataSinkId sink_id, const DataSinkConfig& sink_config, + const std::string& name, RuntimeState* state) : + DataSink(sink_id, sink_config, name, state), + table_id_(sink_config.tsink_->table_sink.target_table_id) {} + + virtual bool is_overwrite() const { return false; } + virtual bool is_result_sink() const { return false; } + virtual int64_t write_id() const { return -1; } + virtual std::string staging_dir() const { return ""; } + virtual int skip_header_line_count() const { return 0; } + virtual TSortingOrder::type sorting_order() const = 0; + virtual const vector<int32_t>& sort_columns() const { + static vector<int32_t> dummy; + return dummy; + } + virtual const std::map<string, int64_t>& GetParquetBloomFilterColumns() const { + static std::map<string, int64_t> dummy; + return dummy; + } + + Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker); + + RuntimeProfile::Counter* rows_inserted_counter() { return rows_inserted_counter_; } + RuntimeProfile::Counter* bytes_written_counter() { return bytes_written_counter_; } + RuntimeProfile::Counter* encode_timer() { return encode_timer_; } + RuntimeProfile::Counter* hdfs_write_timer() { return hdfs_write_timer_; } + RuntimeProfile::Counter* compress_timer() { return compress_timer_; } + + virtual std::string DebugString() const = 0; +protected: + /// Key is the concatenation of the evaluated dynamic_partition_key_exprs_ generated by + /// GetHashTblKey(). Maps to an OutputPartition and a vector of indices of the rows in + /// the current batch to insert into the partition. The PartitionPair owns the + /// OutputPartition via a unique_ptr so that the memory is freed as soon as the + /// PartitionPair is removed from the map. This is important, because the + /// PartitionPairs can have different lifetimes. For example, a clustered insert into a + /// partitioned table iterates over the partitions, so only one PartitionPairs is + /// in the map at any given time. + typedef std::pair<std::unique_ptr<OutputPartition>, std::vector<int32_t>> PartitionPair; + typedef boost::unordered_map<std::string, PartitionPair> PartitionMap; + + /// Returns TRUE if the target table is transactional. + bool IsTransactional() const { return IsHiveAcid() || IsIceberg(); } + + virtual bool IsHiveAcid() const { return false; } + + /// Sets hdfs_file_name and tmp_hdfs_file_name of given output partition. + /// The Hdfs directory is created from the target table's base Hdfs dir, + /// the partition_key_names_ and the evaluated partition_key_exprs_. + /// The Hdfs file name is the unique_id_str_. + void BuildHdfsFileNames(const HdfsPartitionDescriptor& partition_descriptor, + OutputPartition* output, const std::string &external_partition_path); + + /// Add a temporary file to an output partition. Files are created in a + /// temporary directory and then moved to the real partition directory by the + /// coordinator in a finalization step. The temporary file's current location + /// and final destination are recorded in the state parameter. + /// If this function fails, the tmp file is cleaned up. + Status CreateNewTmpFile(RuntimeState* state, OutputPartition* output_partition) + WARN_UNUSED_RESULT; + + /// Updates runtime stats of HDFS with rows written, then closes the file associated + /// with the partition by calling ClosePartitionFile() + Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition) + WARN_UNUSED_RESULT; + + /// Writes all rows referenced by the row index vector in 'partition_pair' to the + /// partition's writer and clears the row index vector afterwards. + Status WriteRowsToPartition( + RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair) + WARN_UNUSED_RESULT; + + /// Closes the hdfs file for this partition as well as the writer. + Status ClosePartitionFile(RuntimeState* state, OutputPartition* partition) + WARN_UNUSED_RESULT; + + // Returns TRUE if the staging step should be skipped for this partition. This allows + // for faster INSERT query completion time for the S3A filesystem as the coordinator + // does not have to copy the file(s) from the staging locaiton to the final location. We + // do not skip for INSERT OVERWRITEs because the coordinator will delete all files in + // the final location before moving the staged files there, so we cannot write directly + // to the final location and need to write to the temporary staging location. + bool ShouldSkipStaging(RuntimeState* state, OutputPartition* partition); + + /// Returns TRUE for Iceberg tables. + bool IsIceberg() const { return table_desc_->IsIcebergTable(); } + + /// Returns TRUE if an external output directory was provided. + bool HasExternalOutputDir() { return !external_output_dir_.empty(); } + + /// Table id resolved in Prepare() to set tuple_desc_; + TableId table_id_; + + /// string representation of the unique fragment instance id. Used for per-partition + /// Hdfs file names, and for tmp Hdfs directories. Set in Prepare(); + std::string unique_id_str_; + + /// Descriptor of target table. Set in Prepare(). + const HdfsTableDescriptor* table_desc_ = nullptr; + + /// The directory in which an external FE expects results to be written to. + std::string external_output_dir_; + + RuntimeProfile::Counter* partitions_created_counter_; + RuntimeProfile::Counter* files_created_counter_; + RuntimeProfile::Counter* rows_inserted_counter_; + RuntimeProfile::Counter* bytes_written_counter_; + + /// Time spent converting tuple to on disk format. + RuntimeProfile::Counter* encode_timer_; + /// Time spent writing to hdfs + RuntimeProfile::Counter* hdfs_write_timer_; + /// Time spent compressing data + RuntimeProfile::Counter* compress_timer_; +}; + +} diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index df9257c98..8bfdf0c3a 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -853,7 +853,7 @@ Status Coordinator::FinalizeHdfsDml() { DCHECK(has_called_wait_.Load()); DCHECK(finalize_params() != nullptr); bool is_hive_acid = finalize_params()->__isset.write_id; - bool is_iceberg_table = finalize_params()->__isset.spec_id; + bool is_iceberg_table = finalize_params()->__isset.iceberg_params; VLOG_QUERY << "Finalizing query: " << PrintId(query_id()); SCOPED_TIMER(finalization_timer_); diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc index 872cd7be2..5b69b9a1b 100644 --- a/be/src/runtime/descriptors.cc +++ b/be/src/runtime/descriptors.cc @@ -259,6 +259,7 @@ HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc, ObjectPo iceberg_table_location_ = tdesc.icebergTable.table_location; const TIcebergPartitionSpec& spec = tdesc.icebergTable.partition_spec[ tdesc.icebergTable.default_partition_spec_id]; + DCHECK_EQ(spec.spec_id, tdesc.icebergTable.default_partition_spec_id); for (const TIcebergPartitionField& spec_field : spec.partition_fields) { if (spec_field.transform.transform_type == TIcebergPartitionTransformType::VOID) { continue; diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc index b59c27e1d..7d57d8745 100644 --- a/be/src/service/client-request-state.cc +++ b/be/src/service/client-request-state.cc @@ -1550,14 +1550,23 @@ Status ClientRequestState::UpdateCatalog() { catalog_update.__set_transaction_id(finalize_params.transaction_id); catalog_update.__set_write_id(finalize_params.write_id); } - if (finalize_params.__isset.spec_id) { + if (finalize_params.__isset.iceberg_params) { + const TIcebergDmlFinalizeParams& ice_finalize_params = + finalize_params.iceberg_params; + TIcebergOperationParam& cat_ice_op = catalog_update.iceberg_operation; catalog_update.__isset.iceberg_operation = true; - TIcebergOperationParam& ice_op = catalog_update.iceberg_operation; - ice_op.__set_spec_id(finalize_params.spec_id); - ice_op.__set_iceberg_data_files_fb( - dml_exec_state->CreateIcebergDataFilesVector()); - ice_op.__set_is_overwrite(finalize_params.is_overwrite); - ice_op.__set_initial_snapshot_id(finalize_params.initial_snapshot_id); + cat_ice_op.__set_operation(ice_finalize_params.operation); + cat_ice_op.__set_initial_snapshot_id( + ice_finalize_params.initial_snapshot_id); + cat_ice_op.__set_spec_id(ice_finalize_params.spec_id); + if (ice_finalize_params.operation == TIcebergOperation::INSERT) { + cat_ice_op.__set_iceberg_data_files_fb( + dml_exec_state->CreateIcebergDataFilesVector()); + cat_ice_op.__set_is_overwrite(finalize_params.is_overwrite); + } else if (ice_finalize_params.operation == TIcebergOperation::DELETE) { + cat_ice_op.__set_iceberg_delete_files_fb( + dml_exec_state->CreateIcebergDataFilesVector()); + } } Status cnxn_status; diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index 467191450..eb69c3df2 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -603,7 +603,7 @@ struct TIcebergPartitionField { } struct TIcebergPartitionSpec { - 1: required i32 partition_id + 1: required i32 spec_id 2: optional list<TIcebergPartitionField> partition_fields } diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index 6606564f6..ceb244227 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -203,13 +203,19 @@ struct TDdlExecResponse { 6: optional string table_location } + // Parameters for the Iceberg operation. struct TIcebergOperationParam { + 5: required Types.TIcebergOperation operation + // Iceberg partition spec used by this operation 1: optional i32 spec_id; // Iceberg data files to append to the table, encoded in FlatBuffers. - 2: required list<binary> iceberg_data_files_fb; + 2: optional list<binary> iceberg_data_files_fb; + + // Iceberg delete files to append to the table, encoded in FlatBuffers. + 6: optional list<binary> iceberg_delete_files_fb; // Is overwrite operation 3: required bool is_overwrite = false; diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift index 03f6a698a..3076c7ccb 100644 --- a/common/thrift/DataSinks.thrift +++ b/common/thrift/DataSinks.thrift @@ -103,6 +103,9 @@ struct THdfsTableSink { 11: optional map<string, i64> parquet_bloom_filter_col_info; } +struct TIcebergDeleteSink { +} + // Structure to encapsulate specific options that are passed down to the KuduTableSink struct TKuduTableSink { // The position in this vector is equal to the position in the output expressions of the @@ -151,6 +154,7 @@ struct TTableSink { 3: required TSinkAction action 4: optional THdfsTableSink hdfs_table_sink 5: optional TKuduTableSink kudu_table_sink + 6: optional TIcebergDeleteSink iceberg_delete_sink } struct TDataSink { diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index 2e0157a96..e57608587 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -831,8 +831,19 @@ struct TPlanExecInfo { per_node_scan_ranges } +struct TIcebergDmlFinalizeParams { + // Type of the Iceberg operation + 1: required Types.TIcebergOperation operation + + // Stores the Iceberg spec id of the partition spec used for this DML operation. + 2: optional i32 spec_id; + + // Stores the Iceberg snapshot id of the target table for this DML operation. + 3: optional i64 initial_snapshot_id; +} + // Metadata required to finalize a query - that is, to clean up after the query is done. -// Only relevant for INSERT queries. +// Only relevant for DML statements. struct TFinalizeParams { // True if the INSERT query was OVERWRITE, rather than INTO 1: required bool is_overwrite @@ -861,11 +872,8 @@ struct TFinalizeParams { // Stores the ACID write id of the target table for transactional INSERTs. 8: optional i64 write_id; - // Stores the Iceberg spec id of the partition spec used for this INSERT. - 9: optional i32 spec_id; - - // Stores the Iceberg snapshot id of the target table for INSERTs. - 10: optional i64 initial_snapshot_id; + // Stores params for Iceberg operation + 9: optional TIcebergDmlFinalizeParams iceberg_params; } // Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest() diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift index ecd78d5e7..e10ed0543 100644 --- a/common/thrift/Types.thrift +++ b/common/thrift/Types.thrift @@ -111,6 +111,11 @@ enum TStmtType { TESTCASE = 7 } +enum TIcebergOperation { + INSERT = 0, + DELETE = 1 +} + // Level of verboseness for "explain" output. enum TExplainLevel { MINIMAL = 0 diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java index c4561152e..5d7031c2b 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java @@ -58,7 +58,8 @@ public class DeleteStmt extends ModifyStmt { Preconditions.checkState(table_ != null); TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE, ImmutableList.<Expr>of(), resultExprs, referencedColumns_, false, false, - new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1, kuduTxnToken_, + new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1, + getKuduTransactionToken(), 0); Preconditions.checkState(!referencedColumns_.isEmpty()); return tableSink; diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java index f56d48441..8c937178d 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergPartitionSpec.java @@ -36,13 +36,13 @@ import org.apache.impala.thrift.TIcebergPartitionSpec; * ) */ public class IcebergPartitionSpec extends StmtNode { - // Partition id from iceberg PartitionSpec - private int partitionId_; + // Partition spec id from iceberg PartitionSpec + private int specId_; private List<IcebergPartitionField> icebergPartitionFields_; public IcebergPartitionSpec(int partitionId, List<IcebergPartitionField> fields) { - partitionId_ = partitionId; + specId_ = partitionId; icebergPartitionFields_ = fields; } @@ -94,7 +94,7 @@ public class IcebergPartitionSpec extends StmtNode { public TIcebergPartitionSpec toThrift() { TIcebergPartitionSpec result = new TIcebergPartitionSpec(); - result.setPartition_id(partitionId_); + result.setSpec_id(specId_); if (!hasPartitionFields()) return result; for (IcebergPartitionField field : icebergPartitionFields_) { result.addToPartition_fields(field.toThrift()); diff --git a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java index 91079f310..6599fb443 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ModifyStmt.java @@ -29,15 +29,17 @@ import java.util.Set; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.Column; +import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeKuduTable; import org.apache.impala.catalog.FeTable; +import org.apache.impala.catalog.IcebergPositionDeleteTable; import org.apache.impala.catalog.KuduColumn; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.Pair; import org.apache.impala.planner.DataSink; import org.apache.impala.rewrite.ExprRewriter; -import org.apache.thrift.TBaseHelper; +import org.apache.impala.thrift.TIcebergFileFormat; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -79,13 +81,14 @@ public abstract class ModifyStmt extends StatementBase { // will be modified. protected SelectStmt sourceStmt_; - // Target Kudu table. Since currently only Kudu tables are supported, we use a - // concrete table class. Result of analysis. - protected FeKuduTable table_; + // Target table. + protected FeTable table_; + + private ModifyImpl modifyImpl_; // Serialized metadata of transaction object which is set by the Frontend if the // target table is Kudu table and Kudu's transaction is enabled. - protected java.nio.ByteBuffer kuduTxnToken_; + java.nio.ByteBuffer kuduTxnToken_; // END: Members that need to be reset() ///////////////////////////////////////// @@ -104,7 +107,6 @@ public abstract class ModifyStmt extends StatementBase { fromClause_ = Preconditions.checkNotNull(fromClause); assignments_ = Preconditions.checkNotNull(assignmentExprs); wherePredicate_ = wherePredicate; - kuduTxnToken_ = null; } @Override @@ -159,13 +161,21 @@ public abstract class ModifyStmt extends StatementBase { Preconditions.checkNotNull(targetTableRef_); FeTable dstTbl = targetTableRef_.getTable(); - // Only Kudu tables can be updated - if (!(dstTbl instanceof FeKuduTable)) { + table_ = dstTbl; + // Only Kudu and Iceberg tables can be updated. + if (!(dstTbl instanceof FeKuduTable) && !(dstTbl instanceof FeIcebergTable)) { throw new AnalysisException( - format("Impala does not support modifying a non-Kudu table: %s", + format("Impala only supports modifying Kudu and Iceberg tables, " + + "but the following table is neither: %s", dstTbl.getFullName())); } - table_ = (FeKuduTable) dstTbl; + if (dstTbl instanceof FeKuduTable) { + modifyImpl_ = this.new ModifyKudu(); + } else if (dstTbl instanceof FeIcebergTable) { + modifyImpl_ = this.new ModifyIceberg(); + } + + modifyImpl_.analyze(analyzer); // Make sure that the user is allowed to modify the target table. Use ALL because no // UPDATE / DELETE privilege exists yet (IMPALA-3840). @@ -186,6 +196,7 @@ public abstract class ModifyStmt extends StatementBase { fromClause_.reset(); if (sourceStmt_ != null) sourceStmt_.reset(); table_ = null; + modifyImpl_ = null; } /** @@ -208,13 +219,7 @@ public abstract class ModifyStmt extends StatementBase { sourceStmt_ = new SelectStmt(new SelectList(selectList), fromClause_, wherePredicate_, null, null, null, null); - // cast result expressions to the correct type of the referenced slot of the - // target table - int keyColumnsOffset = table_.getPrimaryKeyColumnNames().size(); - for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) { - sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo( - assignments_.get(i - keyColumnsOffset).first.getType())); - } + modifyImpl_.addCastsToAssignmentsInSourceStmt(analyzer); } /** @@ -244,16 +249,8 @@ public abstract class ModifyStmt extends StatementBase { colIndexMap.put(cols.get(i).getName(), i); } - // Add the key columns as slot refs - for (String k : table_.getPrimaryKeyColumnNames()) { - List<String> path = Path.createRawPath(targetTableRef_.getUniqueAlias(), k); - SlotRef ref = new SlotRef(path); - ref.analyze(analyzer); - selectList.add(new SelectListItem(ref, null)); - uniqueSlots.add(ref.getSlotId()); - keySlots.add(ref.getSlotId()); - referencedColumns.add(colIndexMap.get(k)); - } + modifyImpl_.addKeyColumns(analyzer, selectList, referencedColumns, uniqueSlots, + keySlots, colIndexMap); // Assignments are only used in the context of updates. for (Pair<SlotRef, Expr> valueAssignment : assignments_) { @@ -341,6 +338,7 @@ public abstract class ModifyStmt extends StatementBase { * Set Kudu transaction token. */ public void setKuduTransactionToken(byte[] kuduTxnToken) { + Preconditions.checkState(table_ instanceof FeKuduTable); Preconditions.checkNotNull(kuduTxnToken); kuduTxnToken_ = java.nio.ByteBuffer.wrap(kuduTxnToken.clone()); } @@ -348,10 +346,135 @@ public abstract class ModifyStmt extends StatementBase { /** * Return bytes of Kudu transaction token. */ - public byte[] getKuduTransactionToken() { - return kuduTxnToken_ == null ? null : kuduTxnToken_.array(); + public java.nio.ByteBuffer getKuduTransactionToken() { + return kuduTxnToken_; + } + + private void addKeyColumn(Analyzer analyzer, List<SelectListItem> selectList, + List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots, + Map<String, Integer> colIndexMap, String colName) throws AnalysisException { + List<String> path = Path.createRawPath(targetTableRef_.getUniqueAlias(), colName); + SlotRef ref = new SlotRef(path); + ref.analyze(analyzer); + selectList.add(new SelectListItem(ref, null)); + uniqueSlots.add(ref.getSlotId()); + keySlots.add(ref.getSlotId()); + referencedColumns.add(colIndexMap.get(colName)); } @Override public abstract String toSql(ToSqlOptions options); + + private interface ModifyImpl { + void analyze(Analyzer analyzer) throws AnalysisException; + + void addCastsToAssignmentsInSourceStmt(Analyzer analyzer) + throws AnalysisException; + + void addKeyColumns(Analyzer analyzer, + List<SelectListItem> selectList, List<Integer> referencedColumns, + Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> colIndexMap) + throws AnalysisException; + } + + private class ModifyKudu implements ModifyImpl { + // Target Kudu table. Result of analysis. + FeKuduTable kuduTable_ = (FeKuduTable)table_; + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException {} + + @Override + public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer) + throws AnalysisException { + // cast result expressions to the correct type of the referenced slot of the + // target table + int keyColumnsOffset = kuduTable_.getPrimaryKeyColumnNames().size(); + for (int i = keyColumnsOffset; i < sourceStmt_.resultExprs_.size(); ++i) { + sourceStmt_.resultExprs_.set(i, sourceStmt_.resultExprs_.get(i).castTo( + assignments_.get(i - keyColumnsOffset).first.getType())); + } + } + + @Override + public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList, + List<Integer> referencedColumns, Set<SlotId> uniqueSlots, Set<SlotId> keySlots, + Map<String, Integer> colIndexMap) throws AnalysisException { + // Add the key columns as slot refs + for (String k : kuduTable_.getPrimaryKeyColumnNames()) { + addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots, + colIndexMap, k); + } + } + } + + private class ModifyIceberg implements ModifyImpl { + FeIcebergTable originalTargetTable_; + IcebergPositionDeleteTable icePosDelTable_; + + public ModifyIceberg() { + originalTargetTable_ = (FeIcebergTable)table_; + icePosDelTable_ = new IcebergPositionDeleteTable((FeIcebergTable)table_); + // Make the virtual position delete table the new target table. + table_ = icePosDelTable_; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + if (ModifyStmt.this instanceof UpdateStmt) { + throw new AnalysisException("UPDATE is not supported for Iceberg table " + + originalTargetTable_.getFullName()); + } + + if (icePosDelTable_.getFormatVersion() == 1) { + throw new AnalysisException("Iceberg V1 table do not support DELETE/UPDATE " + + "operations: " + originalTargetTable_.getFullName()); + } + + String deleteMode = originalTargetTable_.getIcebergApiTable().properties().get( + org.apache.iceberg.TableProperties.DELETE_MODE); + if (deleteMode != null && !deleteMode.equals("merge-on-read")) { + throw new AnalysisException(String.format("Unsupported delete mode: '%s' for " + + "Iceberg table: %s", deleteMode, originalTargetTable_.getFullName())); + } + + if (originalTargetTable_.isPartitioned()) { + throw new AnalysisException("Cannot execute DELETE/UPDATE statement on " + + "partitioned Iceberg table: " + originalTargetTable_.getFullName()); + } + + if (originalTargetTable_.getDeleteFileFormat() != TIcebergFileFormat.PARQUET) { + throw new AnalysisException("Impala can only write delete files in PARQUET, " + + "but the given table uses a different file format: " + + originalTargetTable_.getFullName()); + } + + if (wherePredicate_ == null || + org.apache.impala.analysis.Expr.IS_TRUE_LITERAL.apply(wherePredicate_)) { + // TODO (IMPALA-12136): rewrite DELETE FROM t; statements to TRUNCATE TABLE t; + throw new AnalysisException("For deleting every row, please use TRUNCATE."); + } + } + + @Override + public void addCastsToAssignmentsInSourceStmt(Analyzer analyzer) + throws AnalysisException { + } + + @Override + public void addKeyColumns(Analyzer analyzer, List<SelectListItem> selectList, + List<Integer> referencedColumns, + Set<SlotId> uniqueSlots, Set<SlotId> keySlots, Map<String, Integer> colIndexMap) + throws AnalysisException { + String[] deleteCols; + Preconditions.checkState(!icePosDelTable_.isPartitioned()); + deleteCols = new String[] {"INPUT__FILE__NAME", "FILE__POSITION"}; + // Add the key columns as slot refs + for (String k : deleteCols) { + addKeyColumn(analyzer, selectList, referencedColumns, uniqueSlots, keySlots, + colIndexMap, k); + } + } + } + } diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java index 8f64b52c6..dbd415322 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java @@ -232,7 +232,7 @@ public class SlotRef extends Expr { } FeFsTable feTable = (FeFsTable) rootTable; for (HdfsFileFormat format : feTable.getFileFormats()) { - if (format != HdfsFileFormat.ORC && format != HdfsFileFormat.PARQUET) { + if (!formatSupportsQueryingStruct(format)) { throw new AnalysisException("Querying STRUCT is only supported for ORC and " + "Parquet file formats."); } @@ -241,6 +241,15 @@ public class SlotRef extends Expr { } } + // Returns true if the given HdfsFileFormat supports querying STRUCT types. Iceberg + // tables also have ICEBERG as HdfsFileFormat. We can return TRUE in case of Iceberg + // because the data file formats in the Iceberg table will be also tested separately. + private static boolean formatSupportsQueryingStruct(HdfsFileFormat format) { + return format == HdfsFileFormat.PARQUET || + format == HdfsFileFormat.ORC || + format == HdfsFileFormat.ICEBERG; + } + // Assumes this 'SlotRef' is a struct and that desc_.itemTupleDesc_ has already been // filled. Creates the children 'SlotRef's for the struct recursively. private void addStructChildrenAsSlotRefs() { diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java index 8cd77345b..81661657a 100644 --- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java @@ -67,7 +67,8 @@ public class UpdateStmt extends ModifyStmt { Preconditions.checkState(table_ != null); DataSink dataSink = TableSink.create(table_, TableSink.Op.UPDATE, ImmutableList.<Expr>of(), resultExprs, referencedColumns_, false, false, - new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1, kuduTxnToken_, + new Pair<>(ImmutableList.<Integer>of(), TSortingOrder.LEXICAL), -1, + getKuduTransactionToken(), 0); Preconditions.checkState(!referencedColumns_.isEmpty()); return dataSink; diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java index ca9e2e05c..72afca9d7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; @@ -78,6 +79,7 @@ import org.apache.impala.thrift.THdfsTable; import org.apache.impala.thrift.TIcebergCatalog; import org.apache.impala.thrift.TIcebergFileFormat; import org.apache.impala.thrift.TIcebergPartitionStats; +import org.apache.impala.thrift.TIcebergPartitionTransformType; import org.apache.impala.thrift.TIcebergTable; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TResultSet; @@ -175,6 +177,10 @@ public interface FeIcebergTable extends FeFsTable { */ int getDefaultPartitionSpecId(); + default int getFormatVersion() { + return ((BaseTable)getIcebergApiTable()).operations().current().formatVersion(); + } + /** * @return the Iceberg schema. */ @@ -287,6 +293,22 @@ public interface FeIcebergTable extends FeFsTable { return getFeFsTable().getHostIndex(); } + /** + * @return true if there's at least one partition spec that has at least one non-VOID + * partition field. + */ + default boolean isPartitioned() { + for (IcebergPartitionSpec spec : getPartitionSpecs()) { + if (spec.getIcebergPartitionFieldsSize() == 0) continue; + for (IcebergPartitionField partField : spec.getIcebergPartitionFields()) { + if (partField.getTransformType() != TIcebergPartitionTransformType.VOID) { + return true; + } + } + } + return false; + } + @Override /* FeTable */ default boolean isComputedPartitionColumn(Column col) { Preconditions.checkState(col instanceof IcebergColumn); @@ -313,6 +335,23 @@ public interface FeIcebergTable extends FeFsTable { return -1; } + default TIcebergFileFormat getWriteFileFormat() { + return IcebergUtil.getIcebergFileFormat( + getIcebergApiTable().properties().getOrDefault( + TableProperties.DEFAULT_FILE_FORMAT, + TableProperties.DEFAULT_FILE_FORMAT_DEFAULT)); + } + + default TIcebergFileFormat getDeleteFileFormat() { + String deleteFormat = + getIcebergApiTable().properties().get(TableProperties.DELETE_DEFAULT_FILE_FORMAT); + if (deleteFormat != null) { + return IcebergUtil.getIcebergFileFormat(deleteFormat); + } + // If delete file format is not specified, use write format. + return getWriteFileFormat(); + } + /** * Sets 'tableStats_' for the Iceberg table by it's partition stats. * TODO: Now the calculation of V2 Iceberg table is not accurate. After diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java index d5929ab36..1fc4aece7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergPositionDeleteTable.java @@ -17,6 +17,7 @@ package org.apache.impala.catalog; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -48,6 +49,11 @@ public class IcebergPositionDeleteTable extends VirtualTable implements FeIceber public static String FILE_PATH_COLUMN = "file_path"; public static String POS_COLUMN = "pos"; + public IcebergPositionDeleteTable(FeIcebergTable baseTable) { + this(baseTable, baseTable.getName() + "-POSITION-DELETE", Collections.emptySet(), 0, + new TColumnStats()); + } + public IcebergPositionDeleteTable(FeIcebergTable baseTable, String name, Set<FileDescriptor> deleteFiles, long deleteRecordsCount, TColumnStats filePathsStats) { @@ -67,6 +73,8 @@ public class IcebergPositionDeleteTable extends VirtualTable implements FeIceber addColumn(pos); } + public FeIcebergTable getBaseTable() { return baseTable_; } + private TColumnStats getPosStats(Column pos) { TColumnStats colStats = new TColumnStats(); colStats.num_distinct_values = deleteRecordsCount_; @@ -165,7 +173,7 @@ public class IcebergPositionDeleteTable extends VirtualTable implements FeIceber @Override public List<IcebergPartitionSpec> getPartitionSpecs() { - return null; + return baseTable_.getPartitionSpecs(); } @Override diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java index 2dead16eb..2c385db9c 100644 --- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java @@ -486,10 +486,10 @@ public class IcebergTable extends Table implements FeIcebergTable { new IcebergPartitionTransform(field.getTransform().getTransform_type(), transformParam))); } - ret.add(new IcebergPartitionSpec(param.getPartition_id(), + ret.add(new IcebergPartitionSpec(param.getSpec_id(), fields)); } else { - ret.add(new IcebergPartitionSpec(param.getPartition_id(), null)); + ret.add(new IcebergPartitionSpec(param.getSpec_id(), null)); } } return ret; diff --git a/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java b/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java index 4b9a138fe..85e88a852 100644 --- a/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java +++ b/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java @@ -39,11 +39,10 @@ public class VirtualColumn extends Column { Type.BIGINT, TVirtualColumnType.FILE_POSITION); public static VirtualColumn getVirtualColumn(TVirtualColumnType virtColType) { - if (virtColType == TVirtualColumnType.INPUT_FILE_NAME) { - return INPUT_FILE_NAME; - } - if (virtColType == TVirtualColumnType.FILE_POSITION) { - return FILE_POSITION; + switch (virtColType) { + case INPUT_FILE_NAME: return INPUT_FILE_NAME; + case FILE_POSITION: return FILE_POSITION; + default: break; } return null; } diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 3b6320a8c..55904d3f6 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -311,10 +311,10 @@ public class HdfsScanNode extends ScanNode { private final Map<TupleDescriptor, List<Expr>> statsOriginalConjuncts_ = new LinkedHashMap<>(); - // Tuple that is used to materialize statistics when scanning Parquet files. For each - // column it can contain 0, 1, or 2 slots, depending on whether the column needs to be - // evaluated against the min and/or the max value of the corresponding - // parquet::Statistics. + // Tuple that is used to materialize statistics when scanning Parquet or ORC files. + // For each column it can contain 0, 1, or 2 slots, depending on whether the column + // needs to be evaluated against the min and/or the max value of the corresponding + // file statistics. private TupleDescriptor statsTuple_; // The list of overlap predicate descs. See TOverlapPredicateDesc in PlanNodes.thrift. @@ -738,6 +738,7 @@ public class HdfsScanNode extends ScanNode { */ private void computeStatsTupleAndConjuncts(Analyzer analyzer) throws ImpalaException{ Preconditions.checkNotNull(desc_.getPath()); + if (statsTuple_ != null) return; String tupleName = desc_.getPath().toString() + " statistics"; DescriptorTable descTbl = analyzer.getDescTbl(); statsTuple_ = descTbl.createTupleDescriptor(tupleName); diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java new file mode 100644 index 000000000..d3fb6f4bd --- /dev/null +++ b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.planner; + +import java.util.List; + +import org.apache.impala.analysis.DescriptorTable; +import org.apache.impala.analysis.Expr; +import org.apache.impala.catalog.FeIcebergTable; + +import org.apache.impala.thrift.TDataSink; +import org.apache.impala.thrift.TDataSinkType; +import org.apache.impala.thrift.TExplainLevel; +import org.apache.impala.thrift.TIcebergDeleteSink; +import org.apache.impala.thrift.TQueryOptions; +import org.apache.impala.thrift.TTableSink; +import org.apache.impala.thrift.TTableSinkType; + +/** + * Sink for deleting data from Iceberg tables. Impala deletes data via 'merge-on-read' + * strategy, which means it writes Iceberg position delete files. These files contain + * information (file_path, position) about the deleted rows. Query engines reading from + * an Iceberg table need to exclude the deleted rows from the result of the table scan. + * Impala does this by doing an ANTI JOIN between data files and delete files. + */ +public class IcebergDeleteSink extends TableSink { + + // Set the limit on the maximum number of hdfs table sink instances. + // A value of 0 means no limit. + private int maxHdfsSinks_; + + public IcebergDeleteSink(FeIcebergTable targetTable, List<Expr> outputExprs, + int maxTableSinks) { + super(targetTable, Op.DELETE, outputExprs); + maxHdfsSinks_ = maxTableSinks; + } + + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + // The processing cost to export rows. + processingCost_ = computeDefaultProcessingCost(); + } + + @Override + public void computeResourceProfile(TQueryOptions queryOptions) { + PlanNode inputNode = fragment_.getPlanRoot(); + final int numInstances = fragment_.getNumInstances(); + // Input is clustered, so it produces a single partition at a time. + final long numBufferedPartitionsPerInstance = 1; + // For regular Parquet files we estimate 1GB memory consumption which is already + // a conservative, i.e. probably too high memory estimate. + // Writing out position delete files means we are writing filenames and positions + // per partition. So assuming 0.5 GB per position delete file writer can be still + // considered a very conservative estimate. + final long perPartitionMemReq = 512L * 1024L * 1024L; + + long perInstanceMemEstimate; + // The estimate is based purely on the per-partition mem req if the input cardinality_ + // or the avg row size is unknown. + if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) { + perInstanceMemEstimate = numBufferedPartitionsPerInstance * perPartitionMemReq; + } else { + // The per-partition estimate may be higher than the memory required to buffer + // the entire input data. + long perInstanceInputCardinality = + Math.max(1L, inputNode.getCardinality() / numInstances); + long perInstanceInputBytes = + (long) Math.ceil(perInstanceInputCardinality * inputNode.getAvgRowSize()); + long perInstanceMemReq = + PlanNode.checkedMultiply(numBufferedPartitionsPerInstance, perPartitionMemReq); + perInstanceMemEstimate = Math.min(perInstanceInputBytes, perInstanceMemReq); + } + resourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate); + } + + @Override + public void appendSinkExplainString(String prefix, String detailPrefix, + TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder output) { + output.append(String.format("%sDELETE FROM ICEBERG [%s]\n", prefix, + targetTable_.getFullName())); + if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { + output.append(detailPrefix + "output exprs: ") + .append(Expr.getExplainString(outputExprs_, explainLevel) + "\n"); + } + } + + @Override + protected String getLabel() { + return "ICEBERG DELETER"; + } + + @Override + protected void toThriftImpl(TDataSink tsink) { + TIcebergDeleteSink icebergDeleteSink = new TIcebergDeleteSink(); + TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID, + TTableSinkType.HDFS, sinkOp_.toThrift()); + tTableSink.iceberg_delete_sink = icebergDeleteSink; + tsink.table_sink = tTableSink; + tsink.output_exprs = Expr.treesToThrift(outputExprs_); + } + + @Override + protected TDataSinkType getSinkType() { + return TDataSinkType.TABLE_SINK; + } + + @Override + public void collectExprs(List<Expr> exprs) { + exprs.addAll(outputExprs_); + } + + /** + * Return an estimate of the number of nodes the fragment with this sink will + * run on. This is based on the number of nodes set for the plan root and has an + * upper limit set by the MAX_HDFS_WRITER query option. + */ + public int getNumNodes() { + int num_nodes = getFragment().getPlanRoot().getNumNodes(); + if (maxHdfsSinks_ > 0) { + // If there are more nodes than instances where the fragment was initially + // planned to run then, then the instances will be distributed evenly across them. + num_nodes = Math.min(num_nodes, getNumInstances()); + } + return num_nodes; + } + + /** + * Return an estimate of the number of instances the fragment with this sink + * will run on. This is based on the number of instances set for the plan root + * and has an upper limit set by the MAX_HDFS_WRITER query option. + */ + public int getNumInstances() { + int num_instances = getFragment().getPlanRoot().getNumInstances(); + if (maxHdfsSinks_ > 0) { + num_instances = Math.min(num_instances, maxHdfsSinks_); + } + return num_instances; + } + + @Override + public void computeRowConsumptionAndProductionToCost() { + super.computeRowConsumptionAndProductionToCost(); + fragment_.setFixedInstanceCount(fragment_.getNumInstances()); + } +} diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 3dfc288f6..0975b4b86 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -27,6 +27,7 @@ import org.apache.impala.analysis.AnalysisContext; import org.apache.impala.analysis.AnalysisContext.AnalysisResult; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.ColumnLineageGraph; +import org.apache.impala.analysis.DeleteStmt; import org.apache.impala.analysis.ColumnLineageGraph.ColumnLabel; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.ExprSubstitutionMap; @@ -36,6 +37,7 @@ import org.apache.impala.analysis.QueryStmt; import org.apache.impala.analysis.SortInfo; import org.apache.impala.analysis.TupleId; import org.apache.impala.catalog.FeHBaseTable; +import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeKuduTable; import org.apache.impala.catalog.FeTable; import org.apache.impala.common.ImpalaException; @@ -50,6 +52,7 @@ import org.apache.impala.thrift.TQueryCtx; import org.apache.impala.thrift.TQueryExecRequest; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TRuntimeFilterMode; +import org.apache.impala.thrift.TSortingOrder; import org.apache.impala.thrift.TTableName; import org.apache.impala.util.EventSequence; import org.apache.impala.util.KuduUtil; @@ -175,8 +178,14 @@ public class Planner { ctx_.getAnalysisResult().getUpdateStmt().createDataSink(resultExprs)); } else if (ctx_.isDelete()) { // Set up delete sink for root fragment - rootFragment.setSink( - ctx_.getAnalysisResult().getDeleteStmt().createDataSink(resultExprs)); + DeleteStmt deleteStmt = ctx_.getAnalysisResult().getDeleteStmt(); + if (deleteStmt.getTargetTable() instanceof FeIcebergTable) { + createPreDeleteSort(deleteStmt, rootFragment, ctx_.getRootAnalyzer()); + SortNode sortNode = (SortNode)rootFragment.getPlanRoot(); + resultExprs = Expr.substituteList(resultExprs, + sortNode.getSortInfo().getOutputSmap(), ctx_.getRootAnalyzer(), true); + } + rootFragment.setSink(deleteStmt.createDataSink(resultExprs)); } else if (ctx_.isQuery()) { rootFragment.setSink( ctx_.getAnalysisResult().getQueryStmt().createDataSink(resultExprs)); @@ -895,4 +904,25 @@ public class Planner { inputFragment.setPlanRoot(node); } + + public void createPreDeleteSort(DeleteStmt deleteStmt, PlanFragment inputFragment, + Analyzer analyzer) throws ImpalaException { + List<Expr> orderingExprs = new ArrayList<>(); + + orderingExprs.addAll(deleteStmt.getResultExprs()); + + // Build sortinfo to sort by the ordering exprs. + List<Boolean> isAscOrder = Collections.nCopies(orderingExprs.size(), true); + List<Boolean> nullsFirstParams = Collections.nCopies(orderingExprs.size(), false); + SortInfo sortInfo = new SortInfo(orderingExprs, isAscOrder, nullsFirstParams, + TSortingOrder.LEXICAL); + sortInfo.createSortTupleInfo(deleteStmt.getResultExprs(), analyzer); + sortInfo.getSortTupleDescriptor().materializeSlots(); + + PlanNode node = SortNode.createTotalSortNode( + ctx_.getNextNodeId(), inputFragment.getPlanRoot(), sortInfo, 0); + node.init(analyzer); + + inputFragment.setPlanRoot(node); + } } diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java index 0f34ddb20..df4fd4c73 100644 --- a/fe/src/main/java/org/apache/impala/planner/TableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.impala.analysis.Expr; import org.apache.impala.catalog.FeFsTable; import org.apache.impala.catalog.FeHBaseTable; +import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeKuduTable; import org.apache.impala.catalog.FeTable; import org.apache.impala.common.Pair; @@ -128,6 +129,17 @@ public abstract class TableSink extends DataSink { Preconditions.checkNotNull(partitionKeyExprs); Preconditions.checkNotNull(referencedColumns); Preconditions.checkNotNull(sortProperties.first); + if (table instanceof FeIcebergTable) { + if (sinkAction == Op.INSERT) { + return new HdfsTableSink(table, partitionKeyExprs,outputExprs, overwrite, + inputIsClustered, sortProperties, writeId, maxTableSinks, isResultSink); + } else if (sinkAction == Op.DELETE) { + return new IcebergDeleteSink((FeIcebergTable)table, outputExprs, maxTableSinks); + } else { + // We don't support any other sink actions yet. + Preconditions.checkState(false); + } + } if (table instanceof FeFsTable) { // Hdfs only supports inserts. Preconditions.checkState(sinkAction == Op.INSERT); diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 811db2cfc..b48f4e68a 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -6842,7 +6842,7 @@ public class CatalogOpExecutor { if (table instanceof FeIcebergTable && update.isSetIceberg_operation()) { FeIcebergTable iceTbl = (FeIcebergTable)table; org.apache.iceberg.Transaction iceTxn = IcebergUtil.getIcebergTransaction(iceTbl); - IcebergCatalogOpExecutor.appendFiles(iceTbl, iceTxn, + IcebergCatalogOpExecutor.execute(iceTbl, iceTxn, update.getIceberg_operation()); if (isIcebergHmsIntegrationEnabled(iceTbl.getMetaStoreTable())) { // Add catalog service id and the 'newCatalogVersion' to the table parameters. diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 248018130..f10762c50 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -74,6 +74,7 @@ import org.apache.impala.analysis.CreateDataSrcStmt; import org.apache.impala.analysis.CreateDropRoleStmt; import org.apache.impala.analysis.CreateUdaStmt; import org.apache.impala.analysis.CreateUdfStmt; +import org.apache.impala.analysis.DeleteStmt; import org.apache.impala.analysis.DescribeTableStmt; import org.apache.impala.analysis.DescriptorTable; import org.apache.impala.analysis.DropDataSrcStmt; @@ -119,6 +120,7 @@ import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeKuduTable; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.Function; +import org.apache.impala.catalog.IcebergPositionDeleteTable; import org.apache.impala.catalog.ImpaladCatalog; import org.apache.impala.catalog.ImpaladTableUsageTracker; import org.apache.impala.catalog.MaterializedViewHdfsTable; @@ -166,6 +168,8 @@ import org.apache.impala.thrift.TDdlType; import org.apache.impala.thrift.TDescribeHistoryParams; import org.apache.impala.thrift.TDescribeOutputStyle; import org.apache.impala.thrift.TDescribeResult; +import org.apache.impala.thrift.TIcebergDmlFinalizeParams; +import org.apache.impala.thrift.TIcebergOperation; import org.apache.impala.thrift.TExecRequest; import org.apache.impala.thrift.TExecutorGroupSet; import org.apache.impala.thrift.TExplainResult; @@ -2478,6 +2482,10 @@ public class Frontend { analysisResult.isUpdateStmt() || analysisResult.isDeleteStmt()); result.stmt_type = TStmtType.DML; result.query_exec_request.stmt_type = TStmtType.DML; + if (analysisResult.isDeleteStmt()) { + addFinalizationParamsForDelete(queryCtx, queryExecRequest, + analysisResult.getDeleteStmt()); + } } return result; } catch (Exception e) { @@ -2540,33 +2548,48 @@ public class Frontend { targetTable, insertStmt.getWriteId(), insertStmt.isOverwrite()); } + private static void addFinalizationParamsForDelete( + TQueryCtx queryCtx, TQueryExecRequest queryExecRequest, DeleteStmt deleteStmt) { + FeTable targetTable = deleteStmt.getTargetTable(); + if (!(targetTable instanceof FeIcebergTable)) return; + Preconditions.checkState(targetTable instanceof IcebergPositionDeleteTable); + targetTable = ((IcebergPositionDeleteTable)targetTable).getBaseTable(); + TFinalizeParams finalizeParams = addFinalizationParamsForDml( + queryCtx, targetTable, false); + TIcebergDmlFinalizeParams iceFinalizeParams = new TIcebergDmlFinalizeParams(); + iceFinalizeParams.operation = TIcebergOperation.DELETE; + FeIcebergTable iceTable = (FeIcebergTable)targetTable; + iceFinalizeParams.setSpec_id(iceTable.getDefaultPartitionSpecId()); + iceFinalizeParams.setInitial_snapshot_id(iceTable.snapshotId()); + finalizeParams.setIceberg_params(iceFinalizeParams); + queryExecRequest.setFinalize_params(finalizeParams); + } + // This is public to allow external frontends to utilize this method to fill in the // finalization parameters for externally generated INSERTs. public static void addFinalizationParamsForInsert( TQueryCtx queryCtx, TQueryExecRequest queryExecRequest, FeTable targetTable, - long writeId, boolean isOverwrite) { + long writeId, boolean isOverwrite) { + TFinalizeParams finalizeParams = addFinalizationParamsForDml( + queryCtx, targetTable, isOverwrite); if (targetTable instanceof FeFsTable) { - TFinalizeParams finalizeParams = new TFinalizeParams(); - finalizeParams.setIs_overwrite(isOverwrite); - finalizeParams.setTable_name(targetTable.getTableName().getTbl()); - finalizeParams.setTable_id(DescriptorTable.TABLE_SINK_ID); - String db = targetTable.getTableName().getDb(); - finalizeParams.setTable_db(db == null ? queryCtx.session.database : db); - FeFsTable hdfsTable = (FeFsTable) targetTable; - finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir()); if (writeId != -1) { Preconditions.checkState(queryCtx.isSetTransaction_id()); finalizeParams.setTransaction_id(queryCtx.getTransaction_id()); finalizeParams.setWrite_id(writeId); } else if (targetTable instanceof FeIcebergTable) { FeIcebergTable iceTable = (FeIcebergTable)targetTable; - finalizeParams.setSpec_id(iceTable.getDefaultPartitionSpecId()); - finalizeParams.setInitial_snapshot_id(iceTable.snapshotId()); + TIcebergDmlFinalizeParams iceFinalizeParams = new TIcebergDmlFinalizeParams(); + iceFinalizeParams.operation = TIcebergOperation.INSERT; + iceFinalizeParams.setSpec_id(iceTable.getDefaultPartitionSpecId()); + iceFinalizeParams.setInitial_snapshot_id(iceTable.snapshotId()); + finalizeParams.setIceberg_params(iceFinalizeParams); } else { // TODO: Currently this flag only controls the removal of the query-level staging // directory. HdfsTableSink (that creates the staging dir) calculates the path // independently. So it'd be better to either remove this option, or make it used // everywhere where the staging directory is referenced. + FeFsTable hdfsTable = (FeFsTable) targetTable; finalizeParams.setStaging_dir( hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging"); } @@ -2574,6 +2597,21 @@ public class Frontend { } } + private static TFinalizeParams addFinalizationParamsForDml(TQueryCtx queryCtx, + FeTable targetTable, boolean isOverwrite) { + TFinalizeParams finalizeParams = new TFinalizeParams(); + if (targetTable instanceof FeFsTable) { + finalizeParams.setIs_overwrite(isOverwrite); + finalizeParams.setTable_name(targetTable.getTableName().getTbl()); + finalizeParams.setTable_id(DescriptorTable.TABLE_SINK_ID); + String db = targetTable.getTableName().getDb(); + finalizeParams.setTable_db(db == null ? queryCtx.session.database : db); + FeFsTable hdfsTable = (FeFsTable) targetTable; + finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir()); + } + return finalizeParams; + } + /** * Add the metadata for the result set */ diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java index 03566774c..f419fc41b 100644 --- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java @@ -28,10 +28,12 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.ExpireSnapshots; +import org.apache.iceberg.FileMetadata; import org.apache.iceberg.ManageSnapshots; import org.apache.iceberg.Metrics; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; import org.apache.iceberg.SnapshotManager; import org.apache.iceberg.Table; @@ -325,6 +327,44 @@ public class IcebergCatalogOpExecutor { } } + public static void execute(FeIcebergTable feIcebergTable, Transaction txn, + TIcebergOperationParam icebergOp) throws ImpalaRuntimeException { + switch (icebergOp.operation) { + case INSERT: appendFiles(feIcebergTable, txn, icebergOp); break; + case DELETE: deleteRows(feIcebergTable, txn, icebergOp); break; + default: throw new ImpalaRuntimeException( + "Unknown Iceberg operation: " + icebergOp.operation); + } + } + + public static void deleteRows(FeIcebergTable feIcebergTable, Transaction txn, + TIcebergOperationParam icebergOp) throws ImpalaRuntimeException { + org.apache.iceberg.Table nativeIcebergTable = feIcebergTable.getIcebergApiTable(); + List<ByteBuffer> deleteFilesFb = icebergOp.getIceberg_delete_files_fb(); + RowDelta rowDelta = txn.newRowDelta(); + for (ByteBuffer buf : deleteFilesFb) { + FbIcebergDataFile deleteFile = FbIcebergDataFile.getRootAsFbIcebergDataFile(buf); + + PartitionSpec partSpec = nativeIcebergTable.specs().get(icebergOp.getSpec_id()); + Metrics metrics = buildDataFileMetrics(deleteFile); + FileMetadata.Builder builder = FileMetadata.deleteFileBuilder(partSpec) + .ofPositionDeletes() + .withMetrics(metrics) + .withPath(deleteFile.path()) + .withFormat(IcebergUtil.fbFileFormatToIcebergFileFormat(deleteFile.format())) + .withRecordCount(deleteFile.recordCount()) + .withFileSizeInBytes(deleteFile.fileSizeInBytes()); + rowDelta.addDeletes(builder.build()); + } + try { + rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id()); + rowDelta.validateNoConflictingDataFiles(); + rowDelta.commit(); + } catch (ValidationException e) { + throw new ImpalaRuntimeException(e.getMessage(), e); + } + } + /** * Append the newly inserted data files to the Iceberg table using the AppendFiles * API. diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java index f44caccf5..aefba0c11 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeModifyStmtsTest.java @@ -119,7 +119,8 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest { AnalyzesOk("update functional_kudu.dimtbl set name = substr('hallo', 3)"); // Only Kudu tables can be updated AnalysisError("update functional.alltypes set intcol = 99", - "Impala does not support modifying a non-Kudu table: functional.alltypes"); + "Impala only supports modifying Kudu and Iceberg tables, but the following "+ + "table is neither: functional.alltypes"); // Non existing column in update AnalysisError("update functional_kudu.dimtbl set links='10'", "Could not resolve column/field reference: 'links'"); @@ -201,10 +202,12 @@ public class AnalyzeModifyStmtsTest extends AnalyzerTest { AnalysisError( "update a set b.name =" + " 'Oskar' FROM functional.testtbl a join functional_kudu.testtbl b", - "Impala does not support modifying a non-Kudu table: functional.testtbl"); + "Impala only supports modifying Kudu and Iceberg tables, but the following " + + "table is neither: functional.testtbl"); AnalysisError( "delete a FROM functional.testtbl a join functional_kudu.testtbl b", - "Impala does not support modifying a non-Kudu table: functional.testtbl"); + "Impala only supports modifying Kudu and Iceberg tables, but the " + + "following table is neither: functional.testtbl"); } @Test diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index b032ebbce..2a274fb2b 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -1282,6 +1282,16 @@ public class PlannerTest extends PlannerTestBase { ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY)); } + + /** + * Check that Iceberg V2 DELETE statements work as expected. + */ + @Test + public void testIcebergV2Delete() { + runPlannerTestFile("iceberg-v2-delete", "functional_parquet", + ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY)); + } + /** * Check that Iceberg metadata table scan plans are as expected. */ diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test new file mode 100644 index 000000000..fa13b70cc --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test @@ -0,0 +1,195 @@ +DELETE FROM iceberg_v2_no_deletes where i = 3 +---- PLAN +DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE] +| +01:SORT +| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| row-size=20B cardinality=1 +| +00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes] + HDFS partitions=1/1 files=1 size=625B + predicates: i = 3 + row-size=24B cardinality=1 +---- DISTRIBUTEDPLAN +DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE] +| +01:SORT +| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| row-size=20B cardinality=1 +| +00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes] + HDFS partitions=1/1 files=1 size=625B + predicates: i = 3 + row-size=24B cardinality=1 +==== +DELETE FROM iceberg_v2_delete_positional where id = 15 +---- PLAN +DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] +| +03:SORT +| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| row-size=20B cardinality=1 +| +02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN] +| row-size=28B cardinality=1 +| +|--01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete] +| HDFS partitions=1/1 files=1 size=1.54KB +| row-size=182B cardinality=1 +| +00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional] + HDFS partitions=1/1 files=1 size=662B + predicates: id = 15 + row-size=28B cardinality=1 +---- DISTRIBUTEDPLAN +DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] +| +04:SORT +| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| row-size=20B cardinality=1 +| +02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST] +| row-size=28B cardinality=1 +| +|--03:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete] +| HDFS partitions=1/1 files=1 size=1.54KB +| row-size=182B cardinality=1 +| +00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional] + HDFS partitions=1/1 files=1 size=662B + predicates: id = 15 + row-size=28B cardinality=1 +==== +DELETE FROM iceberg_v2_delete_positional +where id = (select min(id) from iceberg_v2_delete_positional) +---- PLAN +DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] +| +08:SORT +| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| row-size=20B cardinality=3 +| +07:HASH JOIN [LEFT SEMI JOIN] +| hash predicates: id = min(id) +| runtime filters: RF000 <- min(id) +| row-size=28B cardinality=3 +| +|--06:AGGREGATE [FINALIZE] +| | output: min(id) +| | row-size=8B cardinality=1 +| | +| 05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN] +| | row-size=28B cardinality=3 +| | +| |--04:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-04 functional_parquet.iceberg_v2_delete_positional-position-delete] +| | HDFS partitions=1/1 files=1 size=1.54KB +| | row-size=182B cardinality=1 +| | +| 03:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional] +| HDFS partitions=1/1 files=1 size=662B +| row-size=28B cardinality=3 +| +02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN] +| row-size=28B cardinality=3 +| +|--01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete] +| HDFS partitions=1/1 files=1 size=1.54KB +| row-size=182B cardinality=1 +| +00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional] + HDFS partitions=1/1 files=1 size=662B + runtime filters: RF000 -> id + row-size=28B cardinality=3 +---- DISTRIBUTEDPLAN +DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] +| +13:SORT +| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| row-size=20B cardinality=3 +| +07:HASH JOIN [LEFT SEMI JOIN, BROADCAST] +| hash predicates: id = min(id) +| runtime filters: RF000 <- min(id) +| row-size=28B cardinality=3 +| +|--12:EXCHANGE [BROADCAST] +| | +| 11:AGGREGATE [FINALIZE] +| | output: min:merge(id) +| | row-size=8B cardinality=1 +| | +| 10:EXCHANGE [UNPARTITIONED] +| | +| 06:AGGREGATE +| | output: min(id) +| | row-size=8B cardinality=1 +| | +| 05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST] +| | row-size=28B cardinality=3 +| | +| |--09:EXCHANGE [BROADCAST] +| | | +| | 04:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-04 functional_parquet.iceberg_v2_delete_positional-position-delete] +| | HDFS partitions=1/1 files=1 size=1.54KB +| | row-size=182B cardinality=1 +| | +| 03:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional] +| HDFS partitions=1/1 files=1 size=662B +| row-size=28B cardinality=3 +| +02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST] +| row-size=28B cardinality=3 +| +|--08:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete] +| HDFS partitions=1/1 files=1 size=1.54KB +| row-size=182B cardinality=1 +| +00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional] + HDFS partitions=1/1 files=1 size=662B + runtime filters: RF000 -> id + row-size=28B cardinality=3 +==== +DELETE FROM iceberg_v2_delete_positional WHERE FILE__POSITION = id +---- PLAN +DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] +| +03:SORT +| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| row-size=20B cardinality=1 +| +02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN] +| row-size=28B cardinality=1 +| +|--01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete] +| HDFS partitions=1/1 files=1 size=1.54KB +| row-size=182B cardinality=1 +| +00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional] + HDFS partitions=1/1 files=1 size=662B + predicates: FILE__POSITION = id + row-size=28B cardinality=1 +---- DISTRIBUTEDPLAN +DELETE FROM ICEBERG [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE] +| +04:SORT +| order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST +| row-size=20B cardinality=1 +| +02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST] +| row-size=28B cardinality=1 +| +|--03:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE-01 functional_parquet.iceberg_v2_delete_positional-position-delete] +| HDFS partitions=1/1 files=1 size=1.54KB +| row-size=182B cardinality=1 +| +00:SCAN HDFS [functional_parquet.iceberg_v2_delete_positional] + HDFS partitions=1/1 files=1 size=662B + predicates: FILE__POSITION = id + row-size=28B cardinality=1 +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-complex.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-complex.test new file mode 100644 index 000000000..d8cf7ac0a --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete-complex.test @@ -0,0 +1,106 @@ +==== +---- QUERY +SELECT * FROM ice_complex_delete; +---- RESULTS +1,'[1,2,3]','[[1,2],[3,4]]','{"k1":1,"k2":100}','[{"k1":1}]','{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}' +1,'[1,2,3]','[[1,2],[3,4]]','{"k1":1,"k2":100}','[{"k1":1}]','{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}' +2,'[null,1,2,null,3,null]','[[null,1,2,null],[3,null,4],[],null]','{"k1":2,"k2":null}','[{"k1":1,"k3":null},null,{}]','{"a":null,"b":[null],"c":{"d":[[{"e":null,"f":null},{"e":10,"f":"aaa"},{"e":null,"f":null},{"e":-10,"f":"bbb"},{"e":null,"f":null}],[{"e":11,"f":"c"},null],[],null]},"g":{"g1":{"h":{"i":[2.2,null]}},"g2":{"h":{"i":[]}},"g3":null,"g4":{"h":{"i":null}},"g5":{"h":null}}}' +2,'[null,1,2,null,3,null]','[[null,1,2,null],[3,null,4],[],null]','{"k1":2,"k2":null}','[{"k1":1,"k3":null},null,{}]','{"a":null,"b":[null],"c":{"d":[[{"e":null,"f":null},{"e":10,"f":"aaa"},{"e":null,"f":null},{"e":-10,"f":"bbb"},{"e":null,"f":null}],[{"e":11,"f":"c"},null],[],null]},"g":{"g1":{"h":{"i":[2.2,null]}},"g2":{"h":{"i":[]}},"g3":null,"g4":{"h":{"i":null}},"g5":{"h":null}}}' +3,'[]','[null]','{}','[null,null]','{"a":null,"b":null,"c":{"d":[]},"g":{}}' +3,'[]','[null]','{}','[null,null]','{"a":null,"b":null,"c":{"d":[]},"g":{}}' +4,'NULL','[]','{}','[]','{"a":null,"b":null,"c":{"d":null},"g":null}' +4,'NULL','[]','{}','[]','{"a":null,"b":null,"c":{"d":null},"g":null}' +5,'NULL','NULL','{}','NULL','{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}' +5,'NULL','NULL','{}','NULL','{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}' +6,'NULL','NULL','NULL','NULL','NULL' +6,'NULL','NULL','NULL','NULL','NULL' +7,'NULL','[null,[5,6]]','{"k1":null,"k3":null}','NULL','{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}' +7,'NULL','[null,[5,6]]','{"k1":null,"k3":null}','NULL','{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}' +8,'[-1]','[[-1,-2],[]]','{"k1":-1}','[{},{"k1":1},{},{}]','{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}' +8,'[-1]','[[-1,-2],[]]','{"k1":-1}','[{},{"k1":1},{},{}]','{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}' +---- TYPES +BIGINT, STRING, STRING, STRING, STRING, STRING +==== +---- QUERY +SELECT id, INPUT__FILE__NAME FROM ice_complex_delete WHERE id % 2 = 0 AND INPUT__FILE__NAME LIKE '%orc'; +---- RESULTS +2,regex:'$NAMENODE/.*.orc' +4,regex:'$NAMENODE/.*.orc' +6,regex:'$NAMENODE/.*.orc' +8,regex:'$NAMENODE/.*.orc' +---- TYPES +BIGINT, STRING +==== +---- QUERY +DELETE FROM ice_complex_delete WHERE id % 2 = 0 AND INPUT__FILE__NAME LIKE '%orc'; +SELECT *, INPUT__FILE__NAME FROM ice_complex_delete; +---- RESULTS +1,'[1,2,3]','[[1,2],[3,4]]','{"k1":1,"k2":100}','[{"k1":1}]','{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}',regex:'$NAMENODE/.*.parquet' +1,'[1,2,3]','[[1,2],[3,4]]','{"k1":1,"k2":100}','[{"k1":1}]','{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}',regex:'$NAMENODE/.*.orc' +2,'[null,1,2,null,3,null]','[[null,1,2,null],[3,null,4],[],null]','{"k1":2,"k2":null}','[{"k1":1,"k3":null},null,{}]','{"a":null,"b":[null],"c":{"d":[[{"e":null,"f":null},{"e":10,"f":"aaa"},{"e":null,"f":null},{"e":-10,"f":"bbb"},{"e":null,"f":null}],[{"e":11,"f":"c"},null],[],null]},"g":{"g1":{"h":{"i":[2.2,null]}},"g2":{"h":{"i":[]}},"g3":null,"g4":{"h":{"i":null}},"g5":{"h":null}}}',regex:'$NAMENODE/.*.parquet' +3,'[]','[null]','{}','[null,null]','{"a":null,"b":null,"c":{"d":[]},"g":{}}',regex:'$NAMENODE/.*.parquet' +3,'[]','[null]','{}','[null,null]','{"a":null,"b":null,"c":{"d":[]},"g":{}}',regex:'$NAMENODE/.*.orc' +4,'NULL','[]','{}','[]','{"a":null,"b":null,"c":{"d":null},"g":null}',regex:'$NAMENODE/.*.parquet' +5,'NULL','NULL','{}','NULL','{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}',regex:'$NAMENODE/.*.parquet' +5,'NULL','NULL','{}','NULL','{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}',regex:'$NAMENODE/.*.orc' +6,'NULL','NULL','NULL','NULL','NULL',regex:'$NAMENODE/.*.parquet' +7,'NULL','[null,[5,6]]','{"k1":null,"k3":null}','NULL','{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}',regex:'$NAMENODE/.*.parquet' +7,'NULL','[null,[5,6]]','{"k1":null,"k3":null}','NULL','{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}',regex:'$NAMENODE/.*.orc' +8,'[-1]','[[-1,-2],[]]','{"k1":-1}','[{},{"k1":1},{},{}]','{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}',regex:'$NAMENODE/.*.parquet' +---- TYPES +BIGINT, STRING, STRING, STRING, STRING, STRING, STRING +==== +---- QUERY +SELECT id, INPUT__FILE__NAME FROM ice_complex_delete WHERE id % 2 = 1 AND INPUT__FILE__NAME LIKE '%parquet'; +---- RESULTS +1,regex:'$NAMENODE/.*.parquet' +3,regex:'$NAMENODE/.*.parquet' +5,regex:'$NAMENODE/.*.parquet' +7,regex:'$NAMENODE/.*.parquet' +---- TYPES +BIGINT, STRING +==== +---- QUERY +DELETE FROM ice_complex_delete WHERE id % 2 = 1 AND INPUT__FILE__NAME LIKE '%parquet'; +SELECT *, INPUT__FILE__NAME FROM ice_complex_delete; +---- RESULTS +1,'[1,2,3]','[[1,2],[3,4]]','{"k1":1,"k2":100}','[{"k1":1}]','{"a":1,"b":[1],"c":{"d":[[{"e":10,"f":"aaa"},{"e":-10,"f":"bbb"}],[{"e":11,"f":"c"}]]},"g":{"foo":{"h":{"i":[1.1]}}}}',regex:'$NAMENODE/.*.orc' +2,'[null,1,2,null,3,null]','[[null,1,2,null],[3,null,4],[],null]','{"k1":2,"k2":null}','[{"k1":1,"k3":null},null,{}]','{"a":null,"b":[null],"c":{"d":[[{"e":null,"f":null},{"e":10,"f":"aaa"},{"e":null,"f":null},{"e":-10,"f":"bbb"},{"e":null,"f":null}],[{"e":11,"f":"c"},null],[],null]},"g":{"g1":{"h":{"i":[2.2,null]}},"g2":{"h":{"i":[]}},"g3":null,"g4":{"h":{"i":null}},"g5":{"h":null}}}',regex:'$NAMENODE/.*.parquet' +3,'[]','[null]','{}','[null,null]','{"a":null,"b":null,"c":{"d":[]},"g":{}}',regex:'$NAMENODE/.*.orc' +4,'NULL','[]','{}','[]','{"a":null,"b":null,"c":{"d":null},"g":null}',regex:'$NAMENODE/.*.parquet' +5,'NULL','NULL','{}','NULL','{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}',regex:'$NAMENODE/.*.orc' +6,'NULL','NULL','NULL','NULL','NULL',regex:'$NAMENODE/.*.parquet' +7,'NULL','[null,[5,6]]','{"k1":null,"k3":null}','NULL','{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}',regex:'$NAMENODE/.*.orc' +8,'[-1]','[[-1,-2],[]]','{"k1":-1}','[{},{"k1":1},{},{}]','{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}',regex:'$NAMENODE/.*.parquet' +---- TYPES +BIGINT, STRING, STRING, STRING, STRING, STRING, STRING +==== +---- QUERY +DELETE FROM ice_complex_delete WHERE id < 4; +SELECT *, INPUT__FILE__NAME FROM ice_complex_delete; +---- RESULTS +4,'NULL','[]','{}','[]','{"a":null,"b":null,"c":{"d":null},"g":null}',regex:'$NAMENODE/.*.parquet' +5,'NULL','NULL','{}','NULL','{"a":null,"b":null,"c":null,"g":{"foo":{"h":{"i":[2.2,3.3]}}}}',regex:'$NAMENODE/.*.orc' +6,'NULL','NULL','NULL','NULL','NULL',regex:'$NAMENODE/.*.parquet' +7,'NULL','[null,[5,6]]','{"k1":null,"k3":null}','NULL','{"a":7,"b":[2,3,null],"c":{"d":[[],[null],null]},"g":null}',regex:'$NAMENODE/.*.orc' +8,'[-1]','[[-1,-2],[]]','{"k1":-1}','[{},{"k1":1},{},{}]','{"a":-1,"b":[-1],"c":{"d":[[{"e":-1,"f":"nonnullable"}]]},"g":{}}',regex:'$NAMENODE/.*.parquet' +---- TYPES +BIGINT, STRING, STRING, STRING, STRING, STRING, STRING +==== +---- QUERY +select id, unnest(int_array), input__file__name from ice_complex_delete; +---- RESULTS +8,-1,regex:'.*.parquet' +---- TYPES +BIGINT, INT, STRING +==== +---- QUERY +select id, unnest(int_array_array), input__file__name from ice_complex_delete; +---- RESULTS +7,'NULL',regex:'.*.orc' +7,'[5,6]',regex:'.*.orc' +8,'[-1,-2]',regex:'.*.parquet' +8,'[]',regex:'.*.parquet' +---- TYPES +BIGINT, STRING, STRING +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete.test new file mode 100644 index 000000000..45c5105ca --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-delete.test @@ -0,0 +1,117 @@ +==== +---- QUERY +CREATE TABLE ice_delete (i int, s string) +STORED BY ICEBERG +TBLPROPERTIES ('format-version'='2'); +==== +---- QUERY +# Delete from empty table is no-op. +DELETE FROM ice_delete where i = 1; +SELECT * FROM ice_delete; +---- RESULTS +---- TYPES +INT,STRING +==== +---- QUERY +INSERT INTO ice_delete VALUES(1, 'one'), (2, 'two'), (3, 'three'); +DELETE FROM ice_delete WHERE i = 2; +SELECT * FROM ice_delete; +---- RESULTS +1,'one' +3,'three' +---- TYPES +INT,STRING +==== +---- QUERY +SELECT count(*) FROM ice_delete; +---- RESULTS +2 +---- TYPES +BIGINT +==== +---- QUERY +INSERT INTO ice_delete VALUES (4, 'four'), (5, 'five'), (6, 'six'); +SELECT * FROM ice_delete; +---- RESULTS +1,'one' +3,'three' +4,'four' +5,'five' +6,'six' +---- TYPES +INT,STRING +==== +---- QUERY +DELETE FROM ice_delete WHERE s like 'f%' and i > 4; +SELECT * FROM ice_delete; +---- RESULTS +1,'one' +3,'three' +4,'four' +6,'six' +---- TYPES +INT,STRING +==== +---- QUERY +INSERT INTO ice_delete VALUES (7, 'seven'), (8, 'eight'); +DELETE FROM ice_delete WHERE i in (SELECT i FROM ice_delete where s in ('one', 'three')); +SELECT * FROM ice_delete; +---- RESULTS +4,'four' +6,'six' +7,'seven' +8,'eight' +---- TYPES +INT,STRING +==== +---- QUERY +DELETE FROM ice_delete WHERE FILE__POSITION = 0; +SELECT * FROM ice_delete; +---- RESULTS +6,'six' +8,'eight' +---- TYPES +INT,STRING +==== +---- QUERY +INSERT INTO ice_delete VALUES (9, 'nine'), (10, 'ten'); +DELETE FROM ice_delete WHERE s = (SELECT min(s) FROM ice_delete); +SELECT * FROM ice_delete; +---- RESULTS +6,'six' +9,'nine' +10,'ten' +---- TYPES +INT,STRING +==== +---- QUERY +DELETE FROM ice_delete WHERE i < 10; +SELECT * FROM ice_delete; +---- RESULTS +10,'ten' +---- TYPES +INT,STRING +==== +---- QUERY +DELETE FROM ice_delete WHERE i = 1000; +SELECT * FROM ice_delete; +---- RESULTS +10,'ten' +---- TYPES +INT,STRING +==== +---- QUERY +CREATE TABLE ice_lineitem STORED BY ICEBERG +TBLPROPERTIES ('format-version'='2') +AS SELECT * FROM tpch_parquet.lineitem; +DELETE FROM ice_lineitem WHERE l_orderkey % 5 = 1; +SELECT count(*) FROM ice_lineitem; +---- RESULTS +4799418 +---- TYPES +BIGINT +==== +---- QUERY +SELECT * FROM ice_lineitem WHERE l_orderkey % 5 = 1; +---- RESULTS +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test index 8cebc653a..6be8c01ff 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test @@ -683,3 +683,55 @@ select * from functional_parquet.iceberg_v2_delete_equality for system_time as o ---- CATCH ImpalaRuntimeException: Iceberg table functional_parquet.iceberg_v2_delete_equality has EQUALITY delete file which is currently not supported by Impala ==== +---- QUERY +create table ice_delete (i int, s string) +stored by iceberg +tblproperties ('format-version'='1'); +==== +---- QUERY +# Cannot DELETE from Iceberg V1 table +delete from ice_delete where i = 1; +---- CATCH +AnalysisException: Iceberg V1 table do not support DELETE/UPDATE operations: $DATABASE.ice_delete +==== +---- QUERY +# For deleting every row, users must use TRUNCATE. +alter table ice_delete set tblproperties ('format-version'='2'); +delete from ice_delete; +---- CATCH +AnalysisException: For deleting every row, please use TRUNCATE. +==== +---- QUERY +delete from ice_delete where true; +---- CATCH +AnalysisException: For deleting every row, please use TRUNCATE. +==== +---- QUERY +# Cannot delete from Iceberg table if the write format is not Parquet (and there is no delete format) +alter table ice_delete set tblproperties ('write.format.default'='ORC'); +delete from ice_delete where i = 1; +---- CATCH +AnalysisException: Impala can only write delete files in PARQUET, but the given table uses a different file format: $DATABASE.ice_delete +==== +---- QUERY +# Cannot delete from Iceberg table is delete format is not Parquet +alter table ice_delete set tblproperties ('write.format.default'='PARQUET', 'write.delete.format.default'='ORC'); +delete from ice_delete where i = 1; +---- CATCH +AnalysisException: Impala can only write delete files in PARQUET, but the given table uses a different file format: $DATABASE.ice_delete +==== +---- QUERY +# Cannot delete from Iceberg table is write mode is not 'merge-on-read' +alter table ice_delete set tblproperties ('write.delete.format.default'='PARQUET', 'write.delete.mode'='copy-on-write'); +delete from ice_delete where i = 1; +---- CATCH +AnalysisException: Unsupported delete mode: 'copy-on-write' for Iceberg table: $DATABASE.ice_delete +==== +---- QUERY +# Cannot delete from partitioned Iceberg table. +alter table ice_delete set tblproperties ('write.delete.mode'='merge-on-read'); +alter table ice_delete set partition spec (bucket(5, i)); +delete from ice_delete; +---- CATCH +AnalysisException: Cannot execute DELETE/UPDATE statement on partitioned Iceberg table: $DATABASE.ice_delete +==== diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index 028c6859b..e8c2ed522 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -1138,6 +1138,63 @@ class TestIcebergV2Table(IcebergTestSuite): self.run_test_case('QueryTest/iceberg-tablesample-v2', vector, use_db="functional_parquet") - def test_metadata_tables(self, vector, unique_database): + def test_metadata_tables(self, vector): self.run_test_case('QueryTest/iceberg-metadata-tables', vector, use_db="functional_parquet") + + def test_delete(self, vector, unique_database): + self.run_test_case('QueryTest/iceberg-delete', vector, + unique_database) + + @SkipIfFS.hive + def test_delete_hive_read(self, vector, unique_database): + ice_delete = unique_database + ".ice_delete" + self.execute_query("""CREATE TABLE {} (i int, s string) + STORED BY ICEBERG + TBLPROPERTIES('format-version'='2')""".format(ice_delete)) + self.execute_query("INSERT INTO {} VALUES (1, 'one')".format(ice_delete)) + self.execute_query("INSERT INTO {} VALUES (2, 'two')".format(ice_delete)) + self.execute_query("INSERT INTO {} VALUES (3, 'three')".format(ice_delete)) + self.execute_query("DELETE FROM {} WHERE i = 2".format(ice_delete)) + + # Hive needs table property 'format-version' explicitly set + self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format( + ice_delete)) + hive_output = self.run_stmt_in_hive("SELECT * FROM {} ORDER BY i".format(ice_delete)) + expected_output = "ice_delete.i,ice_delete.s\n1,one\n3,three\n" + assert hive_output == expected_output + + ice_lineitem = unique_database + ".linteitem_ice" + self.execute_query("""CREATE TABLE {} + STORED BY ICEBERG + TBLPROPERTIES('format-version'='2') + AS SELECT * FROM tpch_parquet.lineitem""".format(ice_lineitem)) + self.execute_query("DELETE FROM {} WHERE l_orderkey % 5 = 0".format(ice_lineitem)) + impala_result = self.execute_query("SELECT count(*) FROM {}".format(ice_lineitem)) + assert impala_result.data[0] == "4799964" + # Hive needs table property 'format-version' explicitly set + self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format( + ice_lineitem)) + hive_output = self.run_stmt_in_hive("SELECT count(*) FROM {}".format(ice_lineitem)) + assert hive_output == "_c0\n4799964\n" + + @SkipIfFS.hive + def test_delete_complextypes_mixed_files(self, vector, unique_database): + ice_t = unique_database + ".ice_complex_delete" + self.run_stmt_in_hive("""create table {} + stored by iceberg stored as orc as + select * from functional_parquet.complextypestbl;""".format(ice_t)) + # Hive needs table property 'format-version' explicitly set + self.run_stmt_in_hive("ALTER TABLE {} SET TBLPROPERTIES('format-version'='2')".format( + ice_t)) + self.run_stmt_in_hive("""alter table {} + set tblproperties ('write.format.default'='parquet')""".format(ice_t)) + self.run_stmt_in_hive("""insert into {} + select * from functional_parquet.complextypestbl""".format(ice_t)) + + vector.get_value('exec_option')['expand_complex_types'] = True + self.run_test_case('QueryTest/iceberg-delete-complex', vector, + unique_database) + hive_output = self.run_stmt_in_hive("SELECT id FROM {} ORDER BY id".format(ice_t)) + # Test that Hive sees the same rows deleted. + assert hive_output == "id\n4\n5\n6\n7\n8\n"